跳至主要內容

python 实际工作中的实例

ClayPython工作脚本约 4156 字大约 14 分钟

python 实际工作中的实例

让 windows 压测机 解析内网进行压测,避免占用外网带宽

1)生成 hosts 解析文件,将解析文件分成 N 份,解析到不同的 内网ip

cat sync_xxx_hosts.py

#!/usr/bin/env python3
import requests
import json

def get_hosts():
    # 调用 api 获取所有 nginx 主机 ip 地址
    try:
        response = requests.get(
            url="https://xxx/api/v1/appid/xxx/hosts",
            headers={
                "Authorization": "xxx",
                "Content-Type": "application/json; charset=utf-8",
            },
            data=json.dumps({}))
        result = response.json()['data']['assets']
        hosts = [host['ip'] for host in result]
        return hosts

    except requests.exceptions.RequestException:
        print('HTTP Request failed')

def send_request():
    # 调用 api 获取域名列表
    try:
        response = requests.get(
            url=
            "xxx",
            headers={
                "Authorization": "xxx",
                "Content-Type": "application/json; charset=utf-8",
            },
            data=json.dumps({}))
        result = response.json()['data']
        prod_domain_list = [
            domain['name'] for domain in result
            if domain['lbcluster']['id'] == 1
        ]
        ext_website_domain_list = [
            domain['name'] for domain in result
            if domain['lbcluster']['id'] == 9
        ]
        # 通过域名获取不到的 域名列表,和nginx地址
        xx_domain_list = [
            'xxx.xxx.xx'
        ]
        ext_website_hosts = get_hosts()
        for i in range(0, 8):
            filename = f'/etc/ansible/files/hosts{i}'
            with open(filename, 'w') as f:
                for xx_domain in xx_domain_list:
                    f.write('IP地址 ' + bi_domain + '\n')
                for domain in prod_domain_list:
                    f.write('IP地址 ' + domain + '\n')
                for domain in ext_website_domain_list:
                    host = ext_website_hosts[i]
                    f.write(host + ' ' + domain + '\n')
    except requests.exceptions.RequestException:
        print('HTTP Request failed')


if __name__ == '__main__':
    send_request()

2)将 压测机 分为 N 组,生成 ansible 动态 Inventory

cat get_hosts.py

#!/usr/bin/env python3
import requests
import json
import sys
import math

# 将一个列表分为 n 组
def chunks(arr, m):
    n = int(math.ceil(len(arr) / float(m)))
    return [arr[i:i + n] for i in range(0, len(arr), n)]

def send_request():
    # 压测机的 appid 列表
    appids = ['xxx', 'xxx', 'xxx', 'xxx']
    hosts = []
    for appid in appids:
        try:
            # 获取 appid 下 的主机列表
            response = requests.get(
                url=f'https://xxx.xx.xx/api/v1/appid/{appid}/hosts',
                headers={
                    "Authorization": "xxx",
                    "Content-Type": "application/json; charset=utf-8",
                },
                data=json.dumps({}))
            result = response.json()['data']['assets']
            hosts = hosts + [host['ip'] for host in result]
        except requests.exceptions.RequestException:
            print('HTTP Request failed')
    chunks_hosts = chunks(hosts, 8)
    return chunks_hosts



def group(hosts):
    hostdata = {
        'all': {
            "hosts": sum(hosts, []),
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group0': {
            "hosts": hosts[0],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group1': {
            "hosts": hosts[1],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group2': {
            "hosts": hosts[2],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group3': {
            "hosts": hosts[3],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group4': {
            "hosts": hosts[4],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group5': {
            "hosts": hosts[5],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group6': {
            "hosts": hosts[6],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        },
        'group7': {
            "hosts": hosts[7],
            'vars': {
                'ansible_ssh_port': 5985,
                'ansible_connection': 'winrm',
                'ansible_ssh_user': 'administrator',
                'ansible_ssh_pass': 'xxx',
                'ansible_winrm_server_cert_validation': 'ignore',
                'ansible_winrm_transport': 'ntlm'
            }
        }
    }
    print(json.dumps(hostdata))


def host(ip):
    host_dict = {}
    print(json.dumps(host_dict))


if __name__ == '__main__':
    if len(sys.argv) == 2 and (sys.argv[1] == '--list'):
        hosts = send_request()
        group(hosts)
    elif len(sys.argv) == 3 and sys.argv[1] == "--host":
        host(sys.argv[2])
    else:
        print("Usage: %s --list or --host <hostname>" % sys.argv[0])
        sys.exit(1)

3)编写 playbook , sync_win_hosts.yml

---
- hosts: group0
  gather_facts: false
  tasks:
  - name: 同步hosts 文件到 windows 主机
    win_copy:
      src: /etc/ansible/files/hosts0
      dest: C:\Windows\System32\drivers\etc\hosts

- hosts: group1
  gather_facts: false
  tasks:
  - name: 同步hosts 文件到 windows 主机
    win_copy:
      src: /etc/ansible/files/hosts1
      dest: C:\Windows\System32\drivers\etc\hosts

- hosts: group2
  gather_facts: false
  tasks:
  - name: 同步hosts 文件到 windows 主机
    win_copy:
      src: /etc/ansible/files/hosts2
      dest: C:\Windows\System32\drivers\etc\hosts
# ...

4)添加计划任务,crontab -e

# 同步 hosts 到 压测机
*/5 * * * * /home//code/xxx/sync_xxx_hosts.py >/dev/null 2>&1
* * * * * /usr/local/bin/ansible-playbook -i /home/code/ansible/get_hosts.py /etc/ansible/sync_win_hosts.yml >/dev/null 2>&1

调用 alibabacloud_alidns 去操作 阿里云 dns 解析

官网下载 样例, 添加 参数解析即可

#!/home/clay/alibabacloud_alidns/bin/python

import os
import argparse
import json

from Tea.core import TeaCore
from alibabacloud_alidns20150109 import client
from alibabacloud_alidns20150109.client import Client as DNSClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_alidns20150109 import models as dns_models
from alibabacloud_tea_console.client import Client as ConsoleClient
from alibabacloud_tea_util.client import Client as UtilClient


def get_dns_client(
    access_key_id: str,
    access_key_secret: str,
) -> DNSClient:
    """
    Init 初始化客户端
    @param access_key_id:
    @param access_key_secret:
    @return: Client
    @throws Exception
    """
    config = open_api_models.Config()
    # 传AccessKey ID入config
    config.access_key_id = access_key_id
    config.access_key_secret = access_key_secret
    config.region_id = 'cn-qingdao'
    return DNSClient(config)

def describe_domain_records(
    client: DNSClient,
    domain_name: str,
) -> None:
    """
    DescribeDomainRecords 查询域名解析记录
    @param client:          客户端
    @param domain_name:      域名名称
    @throws Exception
    """
    req = dns_models.DescribeDomainRecordsRequest()
    req.domain_name = domain_name
    req.page_size = 500
    # req.type = "CNAME"
    ConsoleClient.log(f'查询域名({domain_name})的解析记录(json)↓')
    try:
        resp = client.describe_domain_records(req)
        ConsoleClient.log(UtilClient.to_jsonstring(TeaCore.to_map(resp)))
    except Exception as error:
        ConsoleClient.log(error.message)


def add_domain_record(
    client: DNSClient,
    domain_name: str,
    rr: str,
    record_type: str,
    value: str,
) -> None:
    """
    AddDomainRecord  添加域名解析记录
    @param client:            客户端
    @param domain_name:        域名名称
    @param rr:                主机记录
    @param record_type:              记录类型(A/NS/MX/TXT/CNAME/SRV/AAAA/CAA/REDIRECT_URL/FORWARD_URL)
    @param value:             记录值
    @throws Exception
    """
    req = dns_models.AddDomainRecordRequest()
    req.domain_name = domain_name
    req.rr = rr
    req.type = record_type
    req.value = value
    req.ttl = 60
    try:
        resp = client.add_domain_record(req)
        ConsoleClient.log(f'添加域名解析记录的结果(json)↓')
        ConsoleClient.log(UtilClient.to_jsonstring(TeaCore.to_map(resp)))
    except Exception as error:
        ConsoleClient.log(error)

def get_domain_record_id_by_rr(
    client: DNSClient,
    domain_name: str,
    rr: str,
) -> str:
    """
    DescribeDomainRecords 查询域名解析的record_id
    @param client:          客户端
    @param domain_name:      域名名称
    param rr:                主机记录
    @throws Exception
    """
    req = dns_models.DescribeDomainRecordsRequest()
    req.domain_name = domain_name
    req.rrkey_word = rr
    req.page_size = 500
    ConsoleClient.log(f'查询域名({domain_name})的解析的record_id(json)↓')
    try:
        resp = client.describe_domain_records(req)
        strinfo = UtilClient.to_jsonstring(TeaCore.to_map(resp))
        dictinfo = json.loads(strinfo)
        records = dictinfo['body']['DomainRecords']['Record']
        for record in records:
            if record['RR'] == rr:
                return record['RecordId']
        # return dictinfo['body']['DomainRecords']['Record'][0]['RecordId']
    except Exception as error:
        ConsoleClient.log(error)

def get_domain_by_rr(
    client: DNSClient,
    domain_name: str,
    rr: str,
) -> str:
    """
    DescribeDomainRecords 查询域名解析的record_id
    @param client:          客户端
    @param domain_name:      域名名称
    param rr:                主机记录
    @throws Exception
    """
    req = dns_models.DescribeDomainRecordsRequest()
    req.domain_name = domain_name
    req.rrkey_word = rr
    req.page_size = 500
    ConsoleClient.log(f'查询域名({domain_name})的解析的record_id(json)↓')
    try:
        resp = client.describe_domain_records(req)
        strinfo = UtilClient.to_jsonstring(TeaCore.to_map(resp))
        dictinfo = json.loads(strinfo)
        records = dictinfo['body']['DomainRecords']['Record']
        for record in records:
            if record['RR'] == rr:
                if record['Type'] == "CNAME":
                    print(f"\033[31m{record['RR']} {record['Type']} {record['Value']}\033[0m")
                    return "cnameexists"
                else:
                    print(f"\033[31m{record['RR']} {record['Type']} {record['Value']}\033[0m")
                break
        else:
            return "notexists"
        # if dictinfo['body']['DomainRecords']['Record'][0]['Type'] == "CNAME":
        #    print(f"\033[31m 解析记录已存在, 为 {dictinfo['body']['DomainRecords']['Record'][0]}\033[0m")
        # #    print(f"\033[32m 开始禁用\033[0m")
        #    print(f"\033[32m 开始删除\033[0m")
        #    return "cnameexists"
        # else:
        #     print(f"\033[31m 解析记录已存在, 为 {dictinfo['body']['DomainRecords']['Record'][0]}\033[0m")
    except Exception:
        return "notexists"

def update_domain_record(
    client: DNSClient,
    record_id: str,
    rr: str,
    record_type: str,
    value: str,
) -> None:
    """
    UpdateDomainRecord  更新域名解析记录
    @param client:          客户端
    @param record_id:        解析记录ID
    @param rr:              主机记录
    @param record_type:            记录类型(A/NS/MX/TXT/CNAME/SRV/AAAA/CAA/REDIRECT_URL/FORWARD_URL)
    @param value:           记录值
    @throws Exception
    """
    req = dns_models.UpdateDomainRecordRequest()
    req.record_id = record_id
    req.rr = rr
    req.type = record_type
    req.value = value
    ConsoleClient.log(f'更新域名解析记录的结果(json)↓')
    try:
        resp = client.update_domain_record(req)
        ConsoleClient.log(UtilClient.to_jsonstring(TeaCore.to_map(resp)))
    except Exception as error:
        ConsoleClient.log(error)

def set_domain_record_status(
    client: DNSClient,
    record_id: str,
    status: str,
) -> None:
    """
    SetDomainRecordStatus  设置域名解析状态
    @param client:      客户端
    @param record_id:    解析记录ID
    @param status:      解析状态(ENABLE/DISABLE)
    @throws Exception
    """
    req = dns_models.SetDomainRecordStatusRequest()
    req.record_id = record_id
    req.status = status
    ConsoleClient.log(f'设置域名解析状态的结果(json)↓')
    try:
        resp = client.set_domain_record_status(req)
        ConsoleClient.log(UtilClient.to_jsonstring(TeaCore.to_map(resp)))
    except Exception as error:
        ConsoleClient.log(error)

def delete_domain_record(
    client: DNSClient,
    record_id: str,
) -> None:
    """
    DeleteDomainRecord  删除域名解析记录
    @param client:         客户端
    @param record_id:       解析记录ID
    @throws Exception
    """
    req = dns_models.DeleteDomainRecordRequest()
    req.record_id = record_id
    ConsoleClient.log(f'删除域名解析记录的结(json)↓')
    try:
        resp = client.delete_domain_record(req)
        ConsoleClient.log(UtilClient.to_jsonstring(TeaCore.to_map(resp)))
    except Exception as error:
        ConsoleClient.log(error)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="""
        Aliyun DNS control tools;

        Example: \r\n

        %(prog)s create xxx.xxx
        """
    )
    parser.add_argument(
        'action', type=str,
        # choices=("status", "create", "update", "enable", "disable", "delete"),
        choices=("status", "create", "update", "enable", "disable"),
        help="Action to run"
    )
    parser.add_argument(
        "domain_name", type=str, default='xxx.xx', nargs="?",
        choices=("xxx.xx", "xxx.xx", "xxx.xx"),
        help="The domain to management"
    )

    args = parser.parse_args()
    action = args.action
    domain_name = args.domain_name

    if domain_name == 'xxx.xxx':
        access_key_id = 'xxx'
        access_key_secret = 'xxx'
    elif domain_name == 'xxx.xx':
        access_key_id = 'xxx'
        access_key_secret = 'xxx'
    elif domain_name == 'xxx.xx':
        # clay 认证
        access_key_id = 'xxx'
        access_key_secret = 'xx'


    # 初始化客户端
    client = get_dns_client(access_key_id, access_key_secret)

    if action == "create":
        with open('list.txt') as f:
            for line in f:
                args = line.split(' ')
                result = get_domain_by_rr(client, domain_name, args[0])
                if result == "notexists":
                    add_domain_record(client, domain_name, *args)
                elif result == "cnameexists":
                    record_id = get_domain_record_id_by_rr(client, domain_name, args[0])
                    # set_domain_record_status(client, record_id, 'DISABLE')
                    delete_domain_record(client, record_id)
                    add_domain_record(client, domain_name, *args)
        os._exit(0)
    elif action == "update":
        with open('list.txt') as f:
            for line in f:
                args = line.split(' ')
                record_id = get_domain_record_id_by_rr(client, domain_name, args[0])
                update_domain_record(client, record_id, *args)
    elif action == "enable":
        with open('list.txt') as f:
            for line in f:
                args = line.split(' ')
                record_id = get_domain_record_id_by_rr(client, domain_name, args[0])
                set_domain_record_status(client, record_id, 'ENABLE')
    elif action == "disable":
        with open('list.txt') as f:
            for line in f:
                args = line.split(' ')
                record_id = get_domain_record_id_by_rr(client, domain_name, args[0])
                set_domain_record_status(client, record_id, 'DISABLE')
    # elif action == "delete":
    #     with open('list.txt') as f:
    #         for line in f:
    #             args = line.split(' ')
    #             record_id = get_domain_record_id_by_rr(client, domain_name, args[0])
    #             delete_domain_record(client, record_id)
    elif action == "status":
        describe_domain_records(client, domain_name)

调用zabbixAPI批量添加web监控

起因:刚来一家公司,要求我添加web监控,800多个页面监控,手动一个个加,不得加死了,所以写了个python脚本,批量添加

脚本如下:

#!/usr/bin/env python3
# ~*~ coding:utf-8 ~*~
from zabbix_api import ZabbixAPI
import sys
import json

ZABBIX_SREVER = "http://192.168.162.122"
USERNAME = "Admin"
PASSWORD = "zabbix"
#HOSTNAME = "sh_ylf_15"
#HOSTNAME = "h5_web_monitor"
HOSTNAME = sys.argv[4]
urlname = sys.argv[1]
url = sys.argv[2]
delay = sys.argv[3]


# 登录
def login(ZABBIX_SREVER, USERNAME, PASSWORD):
    zapi = ZabbixAPI(ZABBIX_SREVER)
    zapi.login(USERNAME, PASSWORD)
    return zapi


# 获取主机id
def gethostid(auth, HOSTNAME):
    json_obj = ZabbixAPI.json_obj(auth, 'host.get', params={"filter": {"host": HOSTNAME}})
    request = ZabbixAPI.do_request(auth, json_obj)

    if request['result']:
        return request['result'][0]['hostid']
    else:
        print("找不到该主机")
        sys.exit(1)


# 获取应用级id
def getapplicationid(auth, hostid):
    # try:
    #     json_obj = ZabbixAPI.json_obj(auth, 'application.create', params={"name": "Web监测","hostid": hostid})
    #     ZabbixAPI.do_request(auth, json_obj)
    # except Exception as e:
    #     print(e)
    json_obj = ZabbixAPI.json_obj(auth, 'application.get', params={"hostids": hostid})
    request = ZabbixAPI.do_request(auth, json_obj)
    for num in range(0, len(request['result'])):
        if request['result'][num]['name'] == 'Web':
            return request['result'][num]['applicationid']


# 增加web监控
def create_web_scenario(auth, urlname, url, hostid, applicationid, delay):
    json_obj = ZabbixAPI.json_obj(auth, 'httptest.get', params={"filter": {"name": urlname}})
    request = ZabbixAPI.do_request(auth, json_obj)
    if request['result']:
        print('该web监控已经添加过了')
    else:
        try:
            json_obj = ZabbixAPI.json_obj(auth, 'httptest.create',
                                          params={"name": urlname, "hostid": hostid, "applicationid": applicationid,
                                                  "delay": delay, "retries": '1', "steps": [
                                                  {'name': urlname, 'url': url, 'timeout': '10', 'status_codes': '200',
                                                   'no': '1'}]})
            ZabbixAPI.do_request(auth, json_obj)
        except Exception as e:
            print(e)
            sys.exit(1)


# 增加触发器
def create_trigger(auth, HOSTNAME, urlname, url):
    expression = "{" + "{0}:web.test.fail[{1}].avg(#3)".format(HOSTNAME, urlname) + "}" + ">=1"
    try:
        json_obj = ZabbixAPI.json_obj(auth, 'trigger.create',
                                      params={"description": "{0}访问失败".format(urlname), "expression": expression,
                                              "priority": 5, "url": url})
        ZabbixAPI.do_request(auth, json_obj)
    except Exception as e:
        print(e)
        sys.exit(1)

    expression = "{" + "{0}:web.test.rspcode[{1},{1}].last(0)".format(HOSTNAME, urlname) + "}" + "<>200"
    try:
        json_obj = ZabbixAPI.json_obj(auth, 'trigger.create',
                                      params={"description": "{0}访问异常".format(urlname), "expression": expression,
                                              "priority": 4, "url": url})
        ZabbixAPI.do_request(auth, json_obj)
    except Exception as e:
        print(e)
        sys.exit(1)


# 获取监控项id
def getitem(auth, hostid, urlname):
    json_obj = ZabbixAPI.json_obj(auth, 'item.get',
                                  params={"hostids": hostid, "webitems": "1",
                                          "filter": {"name": "Response code for step \"$2\" of scenario \"$1\".",
                                                     "key_": "web.test.rspcode[{0},{1}]".format(urlname, urlname)}})
    request = ZabbixAPI.do_request(auth, json_obj)
    return request["result"][0]["itemid"]


# 增加图形
def create_graph(auth, urlname, hostid):
    try:
        itemid = getitem(auth, hostid, urlname)
        json_obj = ZabbixAPI.json_obj(auth, 'graph.create',
                                      params={"name": "h5_{0}状态显示".format(urlname), "width": 900, "height": 200,
                                              "gitems": [{"itemid": itemid, "color": "008800"}]})
        ZabbixAPI.do_request(auth, json_obj)
    except Exception as e:
        print(e)
        sys.exit(1)


def main():
    auth = login(ZABBIX_SREVER, USERNAME, PASSWORD)
    hostid = gethostid(auth, HOSTNAME)
    applicationid = getapplicationid(auth, hostid)

    create_web_scenario(auth, urlname, url, hostid, applicationid, delay)
    create_trigger(auth, HOSTNAME, urlname, url)
    create_graph(auth, urlname, hostid)


if __name__ == '__main__':
    main()

# json_obj = ZabbixAPI.json_obj(auth, 'httptest.get', params={"applicationids": applicationid})
# request = ZabbixAPI.do_request(auth, json_obj)
# print(json.dumps(request, ensure_ascii=False, indent=4))

结合下面的shell脚本运行,

shell脚本如下:

#!/bin/bash
export LANG="en_US.UTF-8"

arr_hostname=("192.168.165.115" "192.168.9.13")
len=${#arr_hostname[@]}
dir=$(cd $(dirname $0) && pwd)
tdir="$dir/tmp"

dt=`date "+%F %T"`

[ -f $tdir/code_error.txt ] && true >$tdir/code_error.txt

## i: 项目信息   j: url   k:时间间隔
while read i j k o;do
    if [[ ! x"$o" == x"" && $o -le $(($len-1)) ]];then
        curl -s -I "$j" > $tdir/curl.txt
        code=`grep 'HTTP/1.1' $tdir/curl.txt|awk '{print $2}'`
        #echo "$i $j $code" 

        if [ $code -eq 200 -o $code -eq 301 -o $code -eq 302 -o $code -eq 405 ];then
            python $dir/zabbix_agent.py $i $j $k ${arr_hostname[$o]}
            [ $? -eq 0 ] && echo "$dt $i $j $k $o create ok" >> $tdir/info || echo "$dt $i $j $k $o create fail" >>$tdir/info
        else
            echo "$i $j $k $o $code" >>$tdir/code_error.txt
            echo "$i $code"
        fi
    else
        echo "hostname参数传递错误"
    fi
done <$dir/list
  • arr_hostname为添加web监控的两个主机,我进行的是交叉互检

最后还需要一个list文件,list文件内容格式如下

csp-web-syndata http://192.168.100.15:8085/csp-web-syndata/shop/synShopInfo/111 3m 0

调用jenkinsAPI批量拷贝视图job

起因:一个视图下有好多job,新建一套微服务的job,完全可以用批量拷贝,然后改一些配置参数

脚本如下:

#!/usr/bin/env python
# coding:utf-8

import jenkins
import sys

src_view = 'jxltz'
dest_view = 'jxyz'
old_branch = 'jxltz-csp-parent'
new_branch = 'jxyz-csp-parent'


def get_server_instance():
    jenkins_url = 'http://192.168.162.175:8080'
    server = jenkins.Jenkins(jenkins_url, username='admin', password='xxx')
    return server


def delete_job():
    server = get_server_instance()
    if server.view_exists(dest_view):
        server.delete_view(dest_view)
    else:
        print("view不存在")
    jobs = server.get_jobs()
    for job in jobs:
        if dest_view in job['name']:
            server.delete_job(job['name'])


def copy_job():
    server = get_server_instance()
    jobs = server.get_jobs(view_name=src_view)
    for job in jobs:
        job_name = job['name']
        newjob_name = job_name.replace(src_view, dest_view)
        if server.job_exists(newjob_name):
            print("job已存在")
        else:
            server.copy_job(job_name, newjob_name)
    configxml = server.get_view_config(src_view)
    newconfigxml = configxml.replace(src_view, dest_view)
    if server.view_exists(dest_view):
        print("view已存在")
    else:
        server.create_view(dest_view, newconfigxml)


def reconfig_job():
    server = get_server_instance()
    jobs = server.get_jobs(view_name=dest_view)
    for job in jobs:
        job_config = server.get_job_config(job['name'])
        new_job_config = job_config.replace(old_branch, new_branch)
        server.reconfig_job(job['name'], new_job_config)


def main():
    if sys.argv[1] == 'delete':
        delete_job()
    elif sys.argv[1] == 'copy':
        copy_job()
    elif sys.argv[1] == 'reconfig':
        reconfig_job()
    else:
        print("Usage: python %s delete|copy|reconfig" % sys.argv[0])


if __name__ == '__main__':
    main()

解析 nginx 配置文件生成 Excel 表格

起因:领导要求统计每个域名下有哪些项目,将域名,端口,后端ip地址统计到Excel表格中。

导入nginx配置文件

scp -r /etc/nginx 192.168.167.24:/etc/

pip安装相关依赖模块

pip3 install xlrd xlwt xlutils

之前在github找到过一个相关的模块,之前用过,现在不用了,讲一下,它怎么导入,命令如下

pip3 install git+https://github.com/fatiherikli/nginxparser.git

编写python脚本

cat nginx_excel.py

#!/usr/bin/python3
# coding:utf-8
import re
import os
import xlrd
import xlwt
from xlutils.copy import copy


# from nginx import NGINX

class Nginx:

    def __init__(self, conf_path):
        self.conf_path = conf_path
        self.backend = list()
        self.serverBlock = list()
        self.servers = list()
        self.tmp_conf = '/tmp/tmp_nginx.conf'
        self.all_conf = '/tmp/nginx.conf'
        self.merge_conf()
        self.parse_backend_ip()
        self.parse_server_block()

    def merge_conf(self):
        conf_dir = os.path.dirname(self.conf_path)
        if len(conf_dir) != 0:
            os.chdir(conf_dir)
        include_regex = '.*include.*'
        fm = open(self.tmp_conf, 'w+')
        with open(self.conf_path, 'r') as f:
            for line in f.readlines():
                r = re.findall(include_regex, line)
                if len(r) > 0:
                    include_line = r[0].split(" ")
                    include_path = include_line[5].split(";")[0]
                    if os.path.exists(include_path):
                        with open(include_path, 'r') as ff:
                            include_con = ff.read()
                            fm.write(include_con)
                else:
                    fm.write(line)
        fm.close()

        # 去掉注释行
        fm = open(self.tmp_conf, 'r')
        with open(self.all_conf, 'w+') as fp:
            for xx in fm.readlines():
                if len(re.findall('^\s*#', xx)) == 0:
                    fp.write(xx)
        fm.close()

        # 删除临时配置文件
        if os.path.exists(self.tmp_conf):
            os.remove(self.tmp_conf)

    def parse_backend_ip(self):
        with open(self.all_conf, 'r') as fp:
            alllines = fp.read()

            # 获取每个upstream块
            regex_1 = 'upstream\s+([^{ ]+)\s*{([^}]*)}'
            upstreams = re.findall(regex_1, alllines)

            for up in upstreams:
                # 获取后端的ip
                regex_2 = 'server\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(?::\d{2,5})?)'
                backend = re.findall(regex_2, up[1])
                # 判断是否有后端的ip设置
                if len(backend) > 0:
                    pool_and_ip = {'poolname': up[0], 'ip': ' '.join(backend)}
                    self.backend.append(pool_and_ip)

    def parse_server_block(self):
        flag = False
        serverblock = ''
        num_of_quote = 0

        with open(self.all_conf, 'r') as fp:
            for line in fp.readlines():
                x = line.replace(' ', '')
                if x.startswith('server{'):
                    num_of_quote += 1
                    flag = True
                    serverblock += line
                    continue
                if flag and '{' in line:
                    num_of_quote += 1

                # 将proxy_pass的别名换成ip
                if flag and 'proxy_pass' in line:
                    r = re.findall('proxy_pass\s+https?://([^;/]*)[^;]*;', line)
                    if len(r) > 0:
                        for pool in self.backend:
                            if r[0] == pool['poolname']:
                                line = line.replace(r[0], pool['ip'])

                if flag and num_of_quote != 0:
                    serverblock += line

                if flag and '}' in line:
                    num_of_quote -= 1

                if flag and num_of_quote == 0:
                    self.serverBlock.append(serverblock)
                    flag = False
                    serverblock = ''
                    num_of_quote = 0

        for singleServer in self.serverBlock:
            # port和server_name均只有一个的情况下
            port = re.findall('listen\s*((?:\d|\s)*)[^;]*;', singleServer)[0]
            r = re.findall('server_name\s+([^;]*);', singleServer)

            if len(r) > 0:
                servername = r[0]
            else:
                continue

            # location可能不止一个
            locations = re.findall('location\s*[\^~\*=]{0,3}\s*([^{ ]*)\s*\{[^}]*proxy_pass\s+https?://([^;/]*)[^;]*;',
                                   singleServer)

            backend_list = list()
            backend_ip = ''

            if len(locations) > 0:
                for location in locations:
                    backend_path = location[0]
                    poolname = location[1]
                    # 如果不是ip的pool name,就取出后端对应的ip
                    if len(re.findall('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', poolname)) == 0:
                        for backend in self.backend:
                            if poolname == backend['poolname']:
                                backend_ip = backend['ip']
                                break
                    else:
                        backend_ip = poolname

                    backend_list.append({"backend_path": backend_path, "backend_ip": backend_ip})

            server = {
                'port': port,
                'server_name': servername,
                'backend': backend_list
            }

            self.servers.append(server)


class NginxExcel:

    def __init__(self, row, col):
        self.row = row
        self.col = col

    def read_from_txt(self, txt_filename, line):
        file_txt = open(txt_filename)
        lines = file_txt.readlines()
        strlist = lines[line].split(" ")
        self.project_name = strlist[0]
        self.domain = strlist[1]
        self.link = strlist[2]
        self.proxy_pass = strlist[3:]

    def write_to_excel(self, excel_filename):
        if os.path.exists(excel_filename):
            rb = xlrd.open_workbook(excel_filename)
            wb = copy(rb)
            sheet = wb.get_sheet(0)
        else:
            wb = xlwt.Workbook()
            sheet = wb.add_sheet('私有云')
            sheet.write(self.row, self.col, "项目名")
            self.col += 1
            sheet.write(self.row, self.col, "域名")
            self.col += 1
            sheet.write(self.row, self.col, "项目链接")
            self.col += 1
            sheet.write(self.row, self.col, "后端地址")
            self.row += 1
            self.col = 0

        sheet.write(self.row, self.col, self.project_name)
        self.col += 1
        sheet.write(self.row, self.col, self.domain)
        self.col += 1
        sheet.write(self.row, self.col, self.link)
        self.col += 1
        sheet.write(self.row, self.col, ", ".join(self.proxy_pass))
        self.row += 1
        self.col = 0
        wb.save(excel_filename)


def main():
    txt_filename = "私有云项目清单.txt"
    excel_filename = "私有云项目清单.xls"
    s = NginxExcel(0, 0)
    nginx = Nginx('/etc/nginx/nginx.conf')
    nginx_list = nginx.servers
    f = open(txt_filename, "w+")

    for server in nginx_list:
        for backend in server["backend"]:
            old_project_name = backend["backend_path"]
            new_project_name = re.sub(r'/', '', old_project_name)
            if server["port"] == "80":
                domain = "http://" + server["server_name"]
                link = domain + "/" + new_project_name + "/"
            elif server["port"] == "443":
                domain = "https://" + server["server_name"]
                link = domain + "/" + new_project_name + "/"
            else:
                domain = "http://" + server["server_name"]
                link = domain + ":" + server["port"] + "/" + new_project_name + "/"
            f.write("%s %s %s %s\n" % (new_project_name, domain, link, backend["backend_ip"]))

    f.close()

    f = open(txt_filename)
    lines = f.readlines()

    for line in range(len(lines)):
        s.read_from_txt(txt_filename, line)
        s.write_to_excel(excel_filename)

    f.close()


if __name__ == '__main__':
    main()

执行脚本,生成Excel表格

python3 nginx_excel.py

表格生成在/etc/nginx目录下

监控 activemq 集群状态并自动重启

起因:activemq集群,由于私有云底层网路问题,隔一段时间假死一次,集群依赖zookeeper集群,后续将集群方式改为了共享存储的方式

要监控Activemq集群的运行情况,我们就可以通过stomp协议的相关客户端来实现。

stomp.py安装

在此我们使用的是stomp协议的python客户端,来实现集群的监控。

支持stomp协议的python客户端,我们使用的是stomp.py这个软件来实现,而stomp.py我们可以在github上来获得。

stomp.py除了可以连接activemq,还可以连接rabbitmq。

stomp.py仓库的github地址为:

https://github.com/jasonrbriggs/stomp.py.git

安装stomp

git clone https://github.com/jasonrbriggs/stomp.py.git
cd stomp.py
python setup.py install

脚本如下

#!/usr/bin/env python
# coding:utf-8

import time
import paramiko
import stomp

A_IP = "192.168.161.1"
B_IP = "192.168.161.2"
C_IP = "192.168.161.3"
USERNAME = "root"
PASSWROD = "*******"

list = []
now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())


class MyParamiko:
    def __init__(self, hostip, username, password, port=22):
        self.hostip = hostip
        self.port = port
        self.username = username
        self.password = password
        self.obj = paramiko.SSHClient()
        self.obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.obj.connect(self.hostip, self.port, self.username, self.password)

    def run_cmd(self, cmd):
        stdin, stdout, stderr = self.obj.exec_command(cmd)
        return stdout.read()

    def close(self):
        self.obj.close()


def check_port():
    for ip in (A_IP, B_IP, C_IP):
        sshobj = MyParamiko(ip, USERNAME, PASSWROD)
        result = sshobj.run_cmd('netstat -ntpl | grep -c 61613')
        list.append(result.decode().strip('\n'))
        sshobj.close()
    print(list)


def activemq_listen(ip):
    class MyListener(object):
        def on_error(self, headers, message):
            print(now + ' received an error %s' % message)

        def on_message(self, headers, message):
            print(now + ' received a message %s' % message)
    conn = stomp.Connection([(ip, 61613)])
    conn.set_listener('', MyListener())
    conn.start()
    conn.connect('admin', 'admin')
    conn.subscribe(destination='/queue/clay.test.mqtest.v1', id=1, ack='auto')
    conn.send(body='clay test', destination='/queue/clay.test.mqtest.v1')
    time.sleep(2)
    conn.disconnect()


def activemq_restart(ip):
    sshobj = MyParamiko(ip, USERNAME, PASSWROD)
    cmd = "ps -ef | grep activemq | grep -v grep | awk '{print $2}'| xargs kill -9;" \
        "export JAVA_HOME=/opt/jdk1.8.0_144;" \
        "/opt/apache-activemq-5.15.9/bin/activemq start"
    result = sshobj.run_cmd(cmd)
    print(now + result)
    sshobj.close()


def activmq_monitor():
    if list == ['1', '0', '0']:
        print(now + " 第一台机器的61613端口处于监听状态,现在尝试往里面发送消息...")
        try:
            activemq_listen(A_IP)
        except stomp.exception.ConnectFailedException:
            print(now + " 发送信息失败,请重启Activemq进程...")
            activemq_restart(A_IP)
    elif list == ['0', '1', '0']:
        print(now + " 第二台机器的61613端口处于监听状态,现在尝试往里面发送消息...")
        try:
            activemq_listen(B_IP)
        except stomp.exception.ConnectFailedException:
            print(now + " 发送信息失败,请重启Activemq进程...")
            activemq_restart(B_IP)
    elif list == ['0', '0', '1']:
        print(now + " 第三台机器的61613端口处于监听状态,现在尝试往里面发送消息...")
        try:
            activemq_listen(C_IP)
        except stomp.exception.ConnectFailedException:
            print(now + " 发送信息失败,请重启Activemq进程...")
            activemq_restart(C_IP)
    else:
        print(now + " activemq的61613端口处于异常,现在重启这三台mq...")
        activemq_restart(A_IP)
        activemq_restart(B_IP)
        activemq_restart(C_IP)


if __name__ == "__main__":
    check_port()
    activmq_monitor()
    print()

创建计划任务

*/2 * * * * /usr/bin/python3 /root/activemq.py >> /var/log/activemq.log