python實現對kafka中日誌資料監控報告(釘釘)
阿新 • • 發佈:2019-01-26
# !/usr/bin/env python
# coding:utf-8
# 該指令碼是作統計
from datetime import datetime, timedelta
import os
from dingtalkchatbot.chatbot import DingtalkChatbot
from kafka import KafkaConsumer
import re
# 存放告警日誌
messages = {500: {}, 400: {}}
# import logging as log
# log.basicConfig(level=log.DEBUG)
# 排序,輸出順序是先 5XX 再 4XX,並且按出現uri次數的順序排序。
def sort_dict():
global messages
tmp = messages
for i in tmp:
tmp[i] = sorted(tmp[i].iteritems(), key=lambda item: item[1], reverse=True)
tmp = sorted(tmp.iteritems(), key=lambda tmp: tmp[0], reverse=True)
return tmp
def send_mail():
try:
message = ''
alldict = sort_dict()
for i in alldict:
if i[0] == 500:
t = '5XX'
message = message + '*******' + t + ':' + '\n'
else:
t = '4XX'
message = message + '*******' + t + ':' + '\n'
for j in i[1]:
message = message + j[0 ] + ' ' + str(j[1]) + '\n'
# 釘釘報警介面,開發給的介面。。。。
a = 'curl url -d "business=gateway&content=%s"' % message
print os.system(a)
# 或者是用相關模組呼叫釘釘的機器人
# posturl = "https://oapi.dingtalk.com/robot/send?access_token=" \
# "378cc60b9306b89e53d71cecccfe70**********************"
# xiaoding = DingtalkChatbot(posturl)
# xiaoding.send_text(msg=message)
print 'successfully sent!'
return 0
# 報警成功返回一個值,作為一個標識
except Exception as e:
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('nnnnnnnn', now, e)
return -1
def tactics(time1, send_time, delta):
global messages
rt = send_time
if time1 - send_time > delta:
# 當前有錯誤的狀態碼的時間如果大於清空過後的messages時間1小時,就報警
print 'time1 - send_time > delta'
a = send_mail()
print a
if a == 0:
# 如果成功報警,那麼就清空全域性變數messages
send_time = time1
rt = send_time
messages = {500: {}, 400: {}}
print 'clean messages'
return rt
# 返回清空messages的時間
# 對訊息進行欄位過濾
def filter(msg):
'''t = ['\"server_name: ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + '\", '
+ '\"request: ' + re.findall(r'\"request\": \"(.+?)\",', msg)[0] + '\", '
+ '\"upstream_status: ' + re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + '\" '
]'''
'''t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' '
+ re.findall(r'\"server_name\": \"(.+?)\",', msg)[0]
+ re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1]
]'''
# 判斷是否有appid,沒有的話輸出的是‘--’
if len(re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)) != 0:
t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' '
+ re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)[0] + ' '
+ re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] +
re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1]
]
else:
t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' '
+ ' -- ' + ' '
+ re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] +
re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1]
]
return t
def kafka_cli(bootstrap_servers, source_topic):
send_time = datetime.now()
# 初始時間
print 'send_time', send_time
# delta = timedelta(minutes=30)
delta = timedelta(hours=1)
# 時間差
# delta = timedelta(seconds=5)
while True:
try:
consumer = KafkaConsumer(source_topic, bootstrap_servers=bootstrap_servers)
for msg in consumer:
t = msg.value
# 獲取該條日誌
time1 = datetime.now()
# 記錄該條日誌的獲取時間
t = t.replace('\\', '')
# 對有的日誌進行字元替換,方便匹配
status = re.findall(r'\"upstream_status\": \"(.+?)\",', t)
# 過濾狀態碼
# print status
if status[0] != '-':
# 對狀態碼進行判斷分類
status = int(status[0])
if status < 400:
print "ok", status
else:
# print 'err'
t = filter(t)
# 呼叫欄位過濾方法
if status > 499:
print t[0]
if t[0] not in messages[500]:
# 如果該狀態碼沒有在字典中,就新增
messages[500][t[0]] = 0
messages[500][t[0]] += 1
rt = tactics(time1, send_time, delta)
# 呼叫報警策略的方法
send_time = rt
else:
print t[0]
if t[0] not in messages[400]:
messages[400][t[0]] = 0
messages[400][t[0]] += 1
rt = tactics(time1, send_time, delta)
send_time = rt
# 更新send_time
else:
print status
except Exception as e:
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now, e)
consumer.close()
def main():
bootstrap_servers = 'kafkaip:9092'
source_topic = 'kibana'
kafka_cli(bootstrap_servers, source_topic)
if __name__ == '__main__':
main()