zabbix 線路質量監控自定義python模組(queuel版),自定義閾值聯動mtr儲存線路故障日誌
阿新 • • 發佈:2021-11-08
前段時間使用Mysql實現了這個功能,缺點是佔用太多系統資源,且指令碼繁重
後續使用BaseManager來使用queue API來儲存和貢獻資料,實現低能耗與輕量,也可用通過開放網際網路API來做分散式監控
server端指令碼
1 #!/usr/bin/env python3 2 #-*-coding:utf-8-*- 3 import queue 4 from multiprocessing.managers import BaseManager 5 import threading,time,subprocess,re,sys,logging,os 6 #需要探測的ip列表7 ipli=[['127.0.0.1','127.0.0.1']] 8 #log函式 9 def logger(): 10 dir = os.path.dirname(os.path.realpath(sys.argv[0])) 11 log_name = dir+'/log' 12 logger = logging.getLogger() 13 fh = logging.FileHandler(log_name) 14 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") 15 fh.setFormatter(formater) 16 logger.setLevel(logging.DEBUG) 17 logger.addHandler(fh) 18 return logger 19 #建立iplist queue 20 def craete_iplist(): 21 global local_ipqueue 22 local_ipqueue = queue.Queue() 23 iplist = BaseManager(address=('127.0.0.1',5000),authkey=b'123') 24 iplist.register('iplist_queue',callable=lambda:local_ipqueue) 25 #如果埠被佔用就說明server端已啟動,退出程式 26 try: 27 iplist.start() 28 except: 29 sys.exit(0) 30 return iplist.iplist_queue() 31 32 #ping探測函式,使用系統原生ping命令,許可權問題沒用使用ping3 33 def dark(ip,leng,ret_manage,iplist_queue): 34 sip,tip=ip 35 restime_name,pkloss_name = 'restime'+''.join(ip).replace('.',''),'pkloss'+''.join(ip).replace('.','') 36 retqueue,pkloss_queue = eval('ret_manage.%s()'%restime_name),eval('ret_manage.%s()'%pkloss_name) 37 while True: 38 count,restime,pkloss,pkcount =0,0,0,20 39 cmd = 'ping -c 1 -W 1 -I %s %s'%(sip,tip) 40 #如果佇列滿就從列表裡刪除該組ip並結束該執行緒,對應的動作是前端禁用或刪除item 41 if retqueue.full() or pkloss_queue.full(): 42 ipli.remove(ip) 43 return 44 while count<pkcount: 45 #當第一次請求資料時佇列為空,把sleep和pkcount設定為1,快速put資料防止zabbix item超時 46 #前端新增item時把新ip組put進iplist佇列,偵聽佇列長度,如有變化做完該次探測後立即退出該執行緒 47 inv = 1 if not retqueue.empty() else 0 48 pkcount = 1 if retqueue.empty() or pkloss_queue.empty() or iplist_queue.qsize() != leng else 20 49 start_time = time.time() 50 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') 51 # print(ret) 52 try: 53 ret =re.findall('\d+\.?\d*' ,(re.findall('time=\d+\.?\d*',ret)[0]))[0] 54 restime += float(ret) 55 except Exception as a : 56 pkloss += 1 57 inv =0 58 count+=1 59 run_time = time.time() - start_time 60 sleep_time = (inv - run_time ) if inv > run_time else 0 61 time.sleep(sleep_time) 62 try: 63 restime=round(restime/(count-pkloss),2) 64 except: 65 restime=0 66 pkloss = round(pkloss/count,2) 67 retqueue.put(restime) 68 pkloss_queue.put(pkloss) 69 if iplist_queue.qsize() != leng: 70 return 71 #queue管理函式 72 def run(): 73 log = logger() 74 #獲取ip列表佇列 75 iplist_queue = craete_iplist() 76 while ipli: 77 #當ip列表佇列不為空,且佇列裡的ip不存在於ipli,就新增進去 78 while not iplist_queue.empty(): 79 new_ip = iplist_queue.get() 80 if new_ip not in ipli: 81 ipli.append(new_ip) 82 ret_manage=BaseManager(address=('127.0.0.1',5001),authkey=b'123') 83 log.debug(ipli) 84 #跟進ip列表裡的值建立不同的restime,pkloss,history佇列 85 for ip in ipli: 86 l = locals() 87 ipformat = ''.join(ip).replace('.','') 88 restime_queue,restime_qname = 'restime'+ipformat ,'q'+ipformat 89 pkloss_queue,pkloss_qname = 'pkloss'+ipformat,'pklossq'+ipformat 90 his_ipqueue,history_qname = 'his_ip'+ipformat,'historyq'+ipformat 91 #使用回撥函式而不使用lambda,後者建立的不同佇列對應的ID是一樣的 92 #注意註冊佇列所使用的queue必須是全域性queue,作用域不能是local 93 globals()[restime_qname]=queue.LifoQueue(maxsize=60) 94 __callable = '''def func(): 95 global %s 96 return %s'''%(restime_qname,restime_qname) 97 exec(__callable) 98 func = l['func'] 99 ret_manage.register(restime_queue,callable=func) 100 globals()[pkloss_qname]=queue.LifoQueue(maxsize=60) 101 _pkloss__callable = '''def pkloss_func(): 102 global %s 103 return %s'''%(pkloss_qname,pkloss_qname) 104 exec(_pkloss__callable) 105 pkloss_func = l['pkloss_func'] 106 ret_manage.register(pkloss_queue,callable=pkloss_func) 107 globals()[history_qname]=queue.LifoQueue(maxsize=60) 108 __his_callable = '''def his_func(): 109 global %s 110 return %s'''%(history_qname,history_qname) 111 exec(__his_callable) 112 his_func = l['his_func'] 113 ret_manage.register(his_ipqueue,callable=his_func) 114 try: 115 ret_manage.start() 116 except: 117 sys.exit(0) 118 leng = iplist_queue.qsize() 119 thli=[] 120 #建立多執行緒,每一對ip對應一個執行緒 121 for x in ipli: 122 t=threading.Thread(target=dark,args=(x,leng,ret_manage,iplist_queue)) 123 t.start() 124 thli.append(t) 125 for x in thli: 126 x.join() 127 ret_manage.shutdown() 128 run()
client端
1 #!/usr/bin/env python3 2 #-*-coding:utf-8-*- 3 from multiprocessing.managers import BaseManager 4 import threading 5 import queue,time,subprocess,argparse,re,sys,os,random 6 7 def darkping(ip,item): 8 9 sip,tip=ip 10 f_ip = ''.join(ip) 11 dir = os.path.dirname(os.path.realpath(sys.argv[0])) 12 restime_ip = 'restime'+f_ip.replace('.','') 13 his_ip = 'his_ip'+f_ip.replace('.','') 14 pkloss_ip = 'pkloss'+f_ip.replace('.','') 15 server = '127.0.0.1' 16 cmd = 'python3 %s/pingd.py'%dir 17 ipli_manager=BaseManager(address=(server,5000),authkey=b'123') 18 m = BaseManager(address=(server,5001),authkey=b'123') 19 while True: 20 try: 21 ipli_manager.connect() 22 m.connect() 23 break 24 except: 25 time.sleep(random.random()) 26 subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 27 try: 28 ipli_manager.connect() 29 m.connect() 30 break 31 except: 32 pass 33 ipli_manager.register('iplist_queue') 34 iplist = ipli_manager.iplist_queue() 35 m.register(restime_ip) 36 m.register(his_ip) 37 m.register(pkloss_ip) 38 waittime = True 39 while waittime: 40 try: 41 restime_queue = eval('m.%s()'%restime_ip) 42 his_queue = eval('m.'+his_ip+'()') 43 pkloss_queue = eval('m.%s()'%pkloss_ip) 44 break 45 except: 46 iplist.put(ip) 47 time.sleep(0.2) 48 try: 49 restime_queue = eval('m.%s()'%restime_ip) 50 his_queue = eval('m.%s()'%his_ip) 51 pkloss_queue = eval('m.%s()'%pkloss_ip) 52 break 53 except Exception as a: 54 pass 55 mtr_dir = dir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log' 56 mtr_cmd = dir + '/mtr.sh'+' '+tip+' '+mtr_dir 57 if item =='restime': 58 try: 59 restime = restime_queue.get(timeout=1) 60 print(eval(item)) 61 except: 62 print(0) 63 sys.exit(0) 64 if not his_queue.empty() and item =='restime': 65 his_restime = his_queue.get() 66 if int(restime) - int(his_restime) >20: 67 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 68 his_queue.put(restime) 69 elif item =='pkloss': 70 try: 71 pkloss = pkloss_queue.get(timeout=1) 72 print(eval(item)) 73 except: 74 print(0) 75 sys.exit(0) 76 if int(pkloss) > 10: 77 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 78 if restime_queue.qsize() >30 or pkloss_queue.qsize()>30 or his_queue.full()>30: 79 iplist.put(tip) 80 81 if __name__ == '__main__': 82 parser = argparse.ArgumentParser(description='icmp for monitor') 83 parser.add_argument('-t',action = 'store',dest='tip') 84 parser.add_argument('-s',action = 'store',dest='sip') 85 parser.add_argument('-I',action='store',dest='item') 86 args= parser.parse_args() 87 try: 88 tip = args.tip 89 sip = args.sip 90 item = args.item 91 ip=[sip,tip] 92 except: 93 print('ip or item error') 94 print('-------example-----------') 95 print('------- darkping -s 10.0.0.2 -t 10.0.0.1 -I restime -----------') 96 try: 97 A_tIP,B_tIP,C_tIP,D_tIP = tip.split('.') 98 tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', tip) 99 if len(sip) > 1: 100 A_sIP,B_sIP,C_sIP,D_sIP = sip.split('.') 101 sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', sip) 102 else: 103 A_sIP,B_sIP,C_sIP,D_sIP=1,1,1,1 104 sipformat=True 105 except: 106 print('ip or item error') 107 print('-------example-----------') 108 print('------- darkping -s 10.0.0.2 -t 10.0.0.1 -I restime -----------') 109 sys.exit(0) 110 itemli=['restime','pkloss'] 111 if int(A_tIP) > 255 or int(B_tIP) > 255 or int(C_tIP)> 255 or int(D_tIP)> 255 or \ 112 int(A_sIP) > 255 or int(B_sIP) > 255 or int(C_sIP)> 255 or int(D_sIP)> 255 or\ 113 item not in itemli or not tipformat or not sipformat: 114 print('ip or item error') 115 sys.exit(0) 116 darkping(ip,item)
效果
Mysql 版
https://www.cnblogs.com/darkchen/p/15524856.html
以驅魔為理想,為生計而奔波