1. 程式人生 > 其它 >zabbix 線路質量監控自定義python模組(queuel版),自定義閾值聯動mtr儲存線路故障日誌

zabbix 線路質量監控自定義python模組(queuel版),自定義閾值聯動mtr儲存線路故障日誌

前段時間使用Mysql實現了這個功能,缺點是佔用太多系統資源,且指令碼繁重

後續使用BaseManager來使用queue API來儲存和貢獻資料,實現低能耗與輕量,也可用通過開放網際網路API來做分散式監控

server端指令碼

  1 #!/usr/bin/env python3
  2 #-*-coding:utf-8-*-
  3 import queue
  4 from multiprocessing.managers import BaseManager
  5 import threading,time,subprocess,re,sys,logging,os
  6 #需要探測的ip列表
7 ipli=[['127.0.0.1','127.0.0.1']] 8 #log函式 9 def logger(): 10 dir = os.path.dirname(os.path.realpath(sys.argv[0])) 11 log_name = dir+'/log' 12 logger = logging.getLogger() 13 fh = logging.FileHandler(log_name) 14 formater = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s
") 15 fh.setFormatter(formater) 16 logger.setLevel(logging.DEBUG) 17 logger.addHandler(fh) 18 return logger 19 #建立iplist queue 20 def craete_iplist(): 21 global local_ipqueue 22 local_ipqueue = queue.Queue() 23 iplist = BaseManager(address=('127.0.0.1',5000),authkey=b'
123') 24 iplist.register('iplist_queue',callable=lambda:local_ipqueue) 25 #如果埠被佔用就說明server端已啟動,退出程式 26 try: 27 iplist.start() 28 except: 29 sys.exit(0) 30 return iplist.iplist_queue() 31 32 #ping探測函式,使用系統原生ping命令,許可權問題沒用使用ping3 33 def dark(ip,leng,ret_manage,iplist_queue): 34 sip,tip=ip 35 restime_name,pkloss_name = 'restime'+''.join(ip).replace('.',''),'pkloss'+''.join(ip).replace('.','') 36 retqueue,pkloss_queue = eval('ret_manage.%s()'%restime_name),eval('ret_manage.%s()'%pkloss_name) 37 while True: 38 count,restime,pkloss,pkcount =0,0,0,20 39 cmd = 'ping -c 1 -W 1 -I %s %s'%(sip,tip) 40 #如果佇列滿就從列表裡刪除該組ip並結束該執行緒,對應的動作是前端禁用或刪除item 41 if retqueue.full() or pkloss_queue.full(): 42 ipli.remove(ip) 43 return 44 while count<pkcount: 45 #當第一次請求資料時佇列為空,把sleep和pkcount設定為1,快速put資料防止zabbix item超時 46 #前端新增item時把新ip組put進iplist佇列,偵聽佇列長度,如有變化做完該次探測後立即退出該執行緒 47 inv = 1 if not retqueue.empty() else 0 48 pkcount = 1 if retqueue.empty() or pkloss_queue.empty() or iplist_queue.qsize() != leng else 20 49 start_time = time.time() 50 ret = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].decode('utf8') 51 # print(ret) 52 try: 53 ret =re.findall('\d+\.?\d*' ,(re.findall('time=\d+\.?\d*',ret)[0]))[0] 54 restime += float(ret) 55 except Exception as a : 56 pkloss += 1 57 inv =0 58 count+=1 59 run_time = time.time() - start_time 60 sleep_time = (inv - run_time ) if inv > run_time else 0 61 time.sleep(sleep_time) 62 try: 63 restime=round(restime/(count-pkloss),2) 64 except: 65 restime=0 66 pkloss = round(pkloss/count,2) 67 retqueue.put(restime) 68 pkloss_queue.put(pkloss) 69 if iplist_queue.qsize() != leng: 70 return 71 #queue管理函式 72 def run(): 73 log = logger() 74 #獲取ip列表佇列 75 iplist_queue = craete_iplist() 76 while ipli: 77 #當ip列表佇列不為空,且佇列裡的ip不存在於ipli,就新增進去 78 while not iplist_queue.empty(): 79 new_ip = iplist_queue.get() 80 if new_ip not in ipli: 81 ipli.append(new_ip) 82 ret_manage=BaseManager(address=('127.0.0.1',5001),authkey=b'123') 83 log.debug(ipli) 84 #跟進ip列表裡的值建立不同的restime,pkloss,history佇列 85 for ip in ipli: 86 l = locals() 87 ipformat = ''.join(ip).replace('.','') 88 restime_queue,restime_qname = 'restime'+ipformat ,'q'+ipformat 89 pkloss_queue,pkloss_qname = 'pkloss'+ipformat,'pklossq'+ipformat 90 his_ipqueue,history_qname = 'his_ip'+ipformat,'historyq'+ipformat 91 #使用回撥函式而不使用lambda,後者建立的不同佇列對應的ID是一樣的 92 #注意註冊佇列所使用的queue必須是全域性queue,作用域不能是local 93 globals()[restime_qname]=queue.LifoQueue(maxsize=60) 94 __callable = '''def func(): 95 global %s 96 return %s'''%(restime_qname,restime_qname) 97 exec(__callable) 98 func = l['func'] 99 ret_manage.register(restime_queue,callable=func) 100 globals()[pkloss_qname]=queue.LifoQueue(maxsize=60) 101 _pkloss__callable = '''def pkloss_func(): 102 global %s 103 return %s'''%(pkloss_qname,pkloss_qname) 104 exec(_pkloss__callable) 105 pkloss_func = l['pkloss_func'] 106 ret_manage.register(pkloss_queue,callable=pkloss_func) 107 globals()[history_qname]=queue.LifoQueue(maxsize=60) 108 __his_callable = '''def his_func(): 109 global %s 110 return %s'''%(history_qname,history_qname) 111 exec(__his_callable) 112 his_func = l['his_func'] 113 ret_manage.register(his_ipqueue,callable=his_func) 114 try: 115 ret_manage.start() 116 except: 117 sys.exit(0) 118 leng = iplist_queue.qsize() 119 thli=[] 120 #建立多執行緒,每一對ip對應一個執行緒 121 for x in ipli: 122 t=threading.Thread(target=dark,args=(x,leng,ret_manage,iplist_queue)) 123 t.start() 124 thli.append(t) 125 for x in thli: 126 x.join() 127 ret_manage.shutdown() 128 run()

client端

  1 #!/usr/bin/env python3
  2 #-*-coding:utf-8-*-
  3 from multiprocessing.managers import BaseManager
  4 import threading
  5 import queue,time,subprocess,argparse,re,sys,os,random
  6 
  7 def darkping(ip,item):
  8   
  9     sip,tip=ip
 10     f_ip = ''.join(ip)
 11     dir = os.path.dirname(os.path.realpath(sys.argv[0]))
 12     restime_ip = 'restime'+f_ip.replace('.','')
 13     his_ip = 'his_ip'+f_ip.replace('.','')
 14     pkloss_ip = 'pkloss'+f_ip.replace('.','')
 15     server = '127.0.0.1'
 16     cmd = 'python3 %s/pingd.py'%dir
 17     ipli_manager=BaseManager(address=(server,5000),authkey=b'123')
 18     m = BaseManager(address=(server,5001),authkey=b'123')
 19     while True:
 20         try:
 21             ipli_manager.connect() 
 22             m.connect()
 23             break
 24         except:
 25             time.sleep(random.random())
 26             subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
 27             try:
 28                 ipli_manager.connect()
 29                 m.connect()
 30                 break
 31             except:
 32                 pass
 33     ipli_manager.register('iplist_queue')       
 34     iplist = ipli_manager.iplist_queue()
 35     m.register(restime_ip)
 36     m.register(his_ip)
 37     m.register(pkloss_ip)
 38     waittime = True
 39     while waittime:
 40         try:
 41             restime_queue = eval('m.%s()'%restime_ip)
 42             his_queue = eval('m.'+his_ip+'()')
 43             pkloss_queue = eval('m.%s()'%pkloss_ip)
 44             break
 45         except:
 46             iplist.put(ip)
 47             time.sleep(0.2)
 48             try:
 49                 restime_queue = eval('m.%s()'%restime_ip)
 50                 his_queue = eval('m.%s()'%his_ip)
 51                 pkloss_queue = eval('m.%s()'%pkloss_ip)
 52                 break
 53             except Exception as a:
 54                 pass
 55     mtr_dir = dir+'/mtr_log/'+tip+'-'+time.strftime('%Y-%m-%d',time.localtime()) + '.log'
 56     mtr_cmd = dir + '/mtr.sh'+' '+tip+' '+mtr_dir
 57     if item =='restime':
 58         try:
 59             restime = restime_queue.get(timeout=1)
 60             print(eval(item))
 61         except:
 62             print(0)
 63             sys.exit(0)
 64         if not his_queue.empty() and item =='restime':
 65             his_restime = his_queue.get()
 66             if int(restime) - int(his_restime) >20:
 67                 subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
 68         his_queue.put(restime)
 69     elif item =='pkloss':
 70         try:
 71             pkloss = pkloss_queue.get(timeout=1)
 72             print(eval(item))
 73         except:
 74             print(0)
 75             sys.exit(0)
 76         if int(pkloss) > 10:
 77             subprocess.Popen(mtr_cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
 78     if restime_queue.qsize() >30  or pkloss_queue.qsize()>30 or his_queue.full()>30:
 79         iplist.put(tip)
 80     
 81 if __name__ == '__main__':
 82     parser = argparse.ArgumentParser(description='icmp for monitor')
 83     parser.add_argument('-t',action = 'store',dest='tip')
 84     parser.add_argument('-s',action = 'store',dest='sip')
 85     parser.add_argument('-I',action='store',dest='item')
 86     args= parser.parse_args()
 87     try:
 88         tip = args.tip
 89         sip = args.sip
 90         item = args.item
 91         ip=[sip,tip]
 92     except:
 93         print('ip or item error')
 94         print('-------example-----------')
 95         print('------- darkping -s 10.0.0.2 -t 10.0.0.1 -I restime -----------')
 96     try:
 97         A_tIP,B_tIP,C_tIP,D_tIP = tip.split('.')
 98         tipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', tip)
 99         if len(sip) > 1:
100             A_sIP,B_sIP,C_sIP,D_sIP = sip.split('.')
101             sipformat = re.findall(r'^\d+\.\d+\.\d+\.\d+$', sip)
102         else:
103             A_sIP,B_sIP,C_sIP,D_sIP=1,1,1,1
104             sipformat=True
105     except:
106         print('ip or item error')
107         print('-------example-----------')
108         print('------- darkping -s 10.0.0.2 -t 10.0.0.1 -I restime -----------')
109         sys.exit(0)
110     itemli=['restime','pkloss']
111     if int(A_tIP) > 255 or int(B_tIP) > 255 or int(C_tIP)> 255 or int(D_tIP)> 255 or \
112         int(A_sIP) > 255 or int(B_sIP) > 255 or int(C_sIP)> 255 or int(D_sIP)> 255 or\
113         item not in itemli or not tipformat or not sipformat:
114         print('ip or item error')
115         sys.exit(0)
116     darkping(ip,item)

效果

Mysql 版

https://www.cnblogs.com/darkchen/p/15524856.html

以驅魔為理想,為生計而奔波