1. 程式人生 > >python實現對kafka中日誌資料監控報告(釘釘)

python實現對kafka中日誌資料監控報告(釘釘)

# !/usr/bin/env python
# coding:utf-8
# 該指令碼是作統計

from datetime import datetime, timedelta
import os
from dingtalkchatbot.chatbot import DingtalkChatbot
from kafka import KafkaConsumer
import re

# 存放告警日誌
messages = {500: {}, 400: {}}


# import logging as log
# log.basicConfig(level=log.DEBUG)

# 排序,輸出順序是先 5XX 再 4XX,並且按出現uri次數的順序排序。
def sort_dict(): global messages tmp = messages for i in tmp: tmp[i] = sorted(tmp[i].iteritems(), key=lambda item: item[1], reverse=True) tmp = sorted(tmp.iteritems(), key=lambda tmp: tmp[0], reverse=True) return tmp def send_mail(): try: message = '' alldict = sort_dict() for
i in alldict: if i[0] == 500: t = '5XX' message = message + '*******' + t + ':' + '\n' else: t = '4XX' message = message + '*******' + t + ':' + '\n' for j in i[1]: message = message + j[0
] + ' ' + str(j[1]) + '\n' # 釘釘報警介面,開發給的介面。。。。 a = 'curl url -d "business=gateway&content=%s"' % message print os.system(a) # 或者是用相關模組呼叫釘釘的機器人 # posturl = "https://oapi.dingtalk.com/robot/send?access_token=" \ # "378cc60b9306b89e53d71cecccfe70**********************" # xiaoding = DingtalkChatbot(posturl) # xiaoding.send_text(msg=message) print 'successfully sent!' return 0 # 報警成功返回一個值,作為一個標識 except Exception as e: now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('nnnnnnnn', now, e) return -1 def tactics(time1, send_time, delta): global messages rt = send_time if time1 - send_time > delta: # 當前有錯誤的狀態碼的時間如果大於清空過後的messages時間1小時,就報警 print 'time1 - send_time > delta' a = send_mail() print a if a == 0: # 如果成功報警,那麼就清空全域性變數messages send_time = time1 rt = send_time messages = {500: {}, 400: {}} print 'clean messages' return rt # 返回清空messages的時間 # 對訊息進行欄位過濾 def filter(msg): '''t = ['\"server_name: ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + '\", ' + '\"request: ' + re.findall(r'\"request\": \"(.+?)\",', msg)[0] + '\", ' + '\"upstream_status: ' + re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + '\" ' ]''' '''t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1] ]''' # 判斷是否有appid,沒有的話輸出的是‘--’ if len(re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)) != 0: t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' ' + re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)[0] + ' ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1] ] else: t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' ' + ' -- ' + ' ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1] ] return t def kafka_cli(bootstrap_servers, source_topic): send_time = datetime.now() # 初始時間 print 'send_time', send_time # delta = timedelta(minutes=30) delta = timedelta(hours=1) # 時間差 # delta = timedelta(seconds=5) while True: try: consumer = KafkaConsumer(source_topic, bootstrap_servers=bootstrap_servers) for msg in consumer: t = msg.value # 獲取該條日誌 time1 = datetime.now() # 記錄該條日誌的獲取時間 t = t.replace('\\', '') # 對有的日誌進行字元替換,方便匹配 status = re.findall(r'\"upstream_status\": \"(.+?)\",', t) # 過濾狀態碼 # print status if status[0] != '-': # 對狀態碼進行判斷分類 status = int(status[0]) if status < 400: print "ok", status else: # print 'err' t = filter(t) # 呼叫欄位過濾方法 if status > 499: print t[0] if t[0] not in messages[500]: # 如果該狀態碼沒有在字典中,就新增 messages[500][t[0]] = 0 messages[500][t[0]] += 1 rt = tactics(time1, send_time, delta) # 呼叫報警策略的方法 send_time = rt else: print t[0] if t[0] not in messages[400]: messages[400][t[0]] = 0 messages[400][t[0]] += 1 rt = tactics(time1, send_time, delta) send_time = rt # 更新send_time else: print status except Exception as e: now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(now, e) consumer.close() def main(): bootstrap_servers = 'kafkaip:9092' source_topic = 'kibana' kafka_cli(bootstrap_servers, source_topic) if __name__ == '__main__': main()