Python+MySQL實現web日誌分析
日誌分析在web系統中故障排查、性能分析方面有著非常重要的作用。目前,開源的ELK系統是成熟且功能強大的選擇。但是部署及學習成本亦然不低,這裏我實現了一個方法上相對簡單(但準確度和效率是有保證的)的實現。另外該腳本的側重點不是通常的PV,UV等展示,而是短期內(如三天歷史or一周歷史)提供細粒度的異常和性能分析。
先說一下我想實現這個功能的驅動力(痛點)吧:
我們有不少站點,前邊有CDN,原站前面是F5,走到源站的訪問總量日均PV約5000w。下面是我們經常面臨一些問題:
- CDN回源異常,可能導致我們源站帶寬和負載都面臨較大的壓力。這時需要能快速的定位到是多了哪些回源IP(即CDN節點)或是某個IP的回源量異常,又或是哪些url的回源量異常
- 在排除了CDN回源問題之後,根據zabbix監控對一些異常的流量或者負載波動按異常時段對比正常時段進行分析,定位到具體的某(幾)類url。反饋給開發進行review以及優化
- 有時zabbix會監控到應用服務器和DB或者緩存服務器之間的流量異常,這種問題一般定位起來是比較麻煩的,甚至波動僅僅是在一兩分鐘內,這就需要對日誌有一個非常精細的分析粒度
- 我們希望能所有的應用服務器能過在本機分析日誌(分布式的思想),然後將分析結果匯總到一起(MySQL)以便查看;並且還希望能盡可能的實時(將定時任務間隔設置低一些),以便發現問題後能盡快的通過此平臺進行分析
- 通用和性能:對於不同的日誌格式只需對腳本稍加改動即可分析;因為將日誌分析放在應用服務器本機,所以腳本的性能和效率也要有保證,不能影響業務
再說下原理:
比較簡單,就是利用python的re模塊通過正則表達式對日誌進行分析處理,取得uri
、args
、時間當前
、狀態碼
、響應大小
、響應時間
、用戶IP
、CDN ip
、server name
等信息存儲進數據庫。
當然前提規範也是必須的:
- 各臺server的日誌文件按統一路徑存放
- 日誌格式保持一致
- 每天的0點日誌切割
我的nginx日誌格式如下:
log_format access ‘$remote_addr - [$time_local] "$request" ‘ ‘$status $body_bytes_sent $request_time "$http_referer" ‘ ‘"$http_user_agent" - $http_x_forwarded_for‘;
日誌分析原理:
通過Python的re模塊,按照應用服務器的日誌格式編寫正則,例如按照我的日誌格式,寫出的正則如下(編寫正則時,先不要換行,確保空格或引號等與日誌格式一致,最後考慮美觀可以折行)
log_pattern = r‘^(?P<remote_addr>.*?) - \[(?P<time_local>.*?)\] "(?P<request>.*?)"‘ r‘ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)‘ r‘ "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)" - (?P<http_x_forwarded_for>.*)$‘
log_pattern_obj = re.compile(log_pattern)
用以上正則來整體匹配一行日誌記錄,然後各個部分可以通過log_pattern_obj.search(log).group(‘remote_addr‘)
、log_pattern_obj.search(log).group(‘body_bytes_sent‘)
等形式來訪問
對於其他格式的nginx日誌或者Apache日誌,按照如上原則,並對數據庫結構做相應調整,都可以輕松的使用該腳本分析處理。
原理雖簡單但實現起來卻發現有好多坑,主要是按照上述的日誌格式(靠空格或雙引號來分割各段)主要問題是面對各種不規範的記錄時(原因不一而足,而且也是樣式繁多),如何正確的分割及處理日誌的各字段,這也是我用re模塊而不是簡單的split()函數的原因。代碼裏對一些“可以容忍”的異常記錄通過一些判斷邏輯予以處理;對於“無法容忍”的異常記錄則返回空字符串並將日誌記錄於文件。
其實對於上述的這些不規範的請求,最好的辦法是在nginx中定義日誌格式時,用一個特殊字符作為分隔符,例如“|”。這樣都不用Python的re模塊,直接字符串分割就能正確的獲取到各段。
接下來看看使用效果:
先看一行數據庫裏的記錄
*************************** 9. row ***************************
id: 9
server: web6
uri_abs: /chapter/?/?.json
uri_abs_crc32: 443227294
args_abs: channel=ios&version=?
args_abs_crc32: 2972340533
time_local: 2017-02-22 23:59:01
response_code: 200
bytes_sent: 218
request_time: 0.028
user_ip: 210.78.141.185
cdn_ip: 27.221.112.163
request_method: GET
uri: /chapter/14278/28275.json
args: channel=ios&version=2.0.6
referer:
其中uri_abs
和args_abs
是對uri和args進行抽象化(抽象出一個模式出來)處理之後的結果。對uri中個段和args中的value部分除了完全由[a-zA-Z-_]+組成的部分之外的部分都用“?”做替換。uri_abs_crc32
和args_abs_crc32
兩列是對抽象化結果進行crc32計算,這兩列單純只是為了在MySQL中對uri或args進行分類統計匯總時得到更好的性能。
現在還沒有完成統一分析的入口腳本,所以還是以sql語句的形式來查詢(對用戶的sql功底有要求,不友好待改善)
- 查詢某站點日/小時pv(其實這一套東西的關註點並不在類似的基礎的統計上)
select count(*) from www where time_local>=‘2016-12-09 00:00:00‘ and time_local<=‘2016-12-09 23:59:59‘
- 查詢某類型url總量(or指定時間段內該url總量)
依據表中的url_abs_crc32字段mysql> select count(*) from www where uri_abs_crc32=2043925204 and time_local > ‘2016-11-23 10:00:00‘ and time_local <‘2016-11-23 23:59:59‘;
- 平均響應時間排行(可基於總量分析;亦可根據時段對比分析)
mysql> select uri_abs,count(*) as num,sum(request_time) as total_time,sum(request_time)/count(*) as average_time from www group by uri_abs_crc32 order by num desc limit 5; +------------------------------------------+---------+------------+--------------+ | uri_abs | num | total_time | average_time | +------------------------------------------+---------+------------+--------------+ | /comicsum/comicshot.php | 2700716 | 1348.941 | 0.0004995 | | /category/?.html | 284788 | 244809.877 | 0.8596215 | | / | 72429 | 1172.113 | 0.0161829 | | /static/hits/?.json | 27451 | 7.658 | 0.0002790 | | /dynamic/o_search/searchKeyword | 26230 | 3757.661 | 0.1432581 | +------------------------------------------+---------+------------+--------------+ 10 rows in set (40.09 sec)
- 平均響應大小排行
mysql> select uri_abs,count(*) as num,sum(bytes_sent) as total_bytes,sum(bytes_sent)/count(*) as average_bytes from www group by uri_abs_crc32 order by num desc,average_bytes desc limit 10; +------------------------------------------+---------+-------------+---------------+ | uri_abs | num | total_bytes | average_bytes | +------------------------------------------+---------+-------------+---------------+ | /comicsum/comicshot.php | 2700716 | 72889752 | 26.9890 | | /category/?.html | 284788 | 3232744794 | 11351.4080 | | / | 72429 | 1904692759 | 26297.3776 | | /static/hits/?.json | 27451 | 5160560 | 187.9917 | | /dynamic/o_search/searchKeyword | 26230 | 3639846 | 138.7665 | +------------------------------------------+---------+-------------+---------------+
以上只列舉了幾個例子,基本上除了UA部分(代碼中已有捕捉,但是筆者用不到),其他的信息都以包含到表中。因此幾乎可以對網站
流量
,負載
,響應時間
等方面的任何疑問給出數據上的支持。
- 平均響應大小排行
Python外部包依賴:pymysql
MySQL(筆者5.6版本)將innodb_file_format
設置為Barracuda
(這個設置並不對其他庫表產生影響,即使生產數據庫設置也無妨),以便在建表語句中可以通過ROW_FORMAT=COMPRESSED
將innodb表這只為壓縮模式,筆者實驗開啟壓縮模式後,數據文件大小將近減小50%。
接下來請看代碼:
#!/bin/env python3
# coding:utf-8
"""
ljk 20161116(update 20170217)
This script should be put in crontab in every web server.Execute every n minutes.
Collect nginx access log, process it and insert the result into mysql.
"""
import os
import re
import subprocess
import time
import warnings
import pymysql
from sys import argv, exit
from socket import gethostname
from urllib.parse import unquote
from zlib import crc32
from multiprocessing import Pool
##### 自定義部分 #####
# 定義日誌格式,利用非貪婪匹配和分組匹配,需要嚴格參照日誌定義中的分隔符和引號
log_pattern = r‘^(?P<remote_addr>.*?) - \[(?P<time_local>.*?)\] "(?P<request>.*?)"‘ r‘ (?P<status>.*?) (?P<body_bytes_sent>.*?) (?P<request_time>.*?)‘ r‘ "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)" - (?P<http_x_forwarded_for>.*)$‘
# request的正則,其實是由 "request_method request_uri server_protocol"三部分組成
request_uri_pattern = r‘^(?P<request_method>(GET|POST|HEAD|DELETE)?) (?P<request_uri>.*?) (?P<server_protocol>HTTP.*)$‘
# 日誌目錄
log_dir = ‘/nginx_log/‘
# 日誌文件命名:作者場景是www.access.log格式,只要保證xxx.access即可,後面隨意
# 要處理的站點(可隨需要想list中添加)
todo = [‘www‘, ‘news‘, ‘m.api‘,]
# MySQL相關設置
mysql_host = ‘xxxx‘
mysql_user = ‘xxxx‘
mysql_passwd = ‘xxxx‘
mysql_port = ‘xxxx‘
mysql_database = ‘xxxx‘
# 表結構:所有字段均加了默認值,所以即使日誌格式和我的不同也可以不用修改表結構和插入語句
creat_table = "CREATE TABLE IF NOT EXISTS {} ( id bigint unsigned NOT NULL AUTO_INCREMENT PRIMARY KEY, server char(11) NOT NULL DEFAULT ‘‘, uri_abs varchar(200) NOT NULL DEFAULT ‘‘ COMMENT ‘對$uri做uridecode,然後做抽象化處理‘, uri_abs_crc32 bigint unsigned NOT NULL DEFAULT ‘0‘ COMMENT ‘對上面uri_abs字段計算crc32‘, args_abs varchar(200) NOT NULL DEFAULT ‘‘ COMMENT ‘對$args做uridecode,然後做抽象化處理‘, args_abs_crc32 bigint unsigned NOT NULL DEFAULT ‘0‘ COMMENT ‘對上面args字段計算crc32‘, time_local timestamp NOT NULL DEFAULT ‘0000-00-00 00:00:00‘, response_code smallint NOT NULL DEFAULT ‘0‘, bytes_sent int NOT NULL DEFAULT ‘0‘ COMMENT ‘發送給客戶端的響應大小‘, request_time float(6,3) NOT NULL DEFAULT ‘0.000‘, user_ip varchar(40) NOT NULL DEFAULT ‘‘, cdn_ip varchar(15) NOT NULL DEFAULT ‘‘ COMMENT ‘CDN最後節點的ip:空字串表示沒經過CDN; - 表示沒經過CDN和F5‘, request_method varchar(7) NOT NULL DEFAULT ‘‘, uri varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘$uri,已做uridecode‘, args varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘$args,已做uridecode‘, referer varchar(255) NOT NULL DEFAULT ‘‘ COMMENT ‘‘, KEY time_local (time_local), KEY uri_abs_crc32 (uri_abs_crc32), KEY args_abs_crc32 (args_abs_crc32) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 row_format=compressed"
##### 自定義部分結束 #####
# 主機名
global server
server = gethostname()
# 今天零點
global today_start
today_start = time.strftime(‘%Y-%m-%d‘, time.localtime()) + ‘ 00:00:00‘
# 將pymysql對於操作中的警告信息轉為可捕捉的異常
warnings.filterwarnings(‘error‘, category=pymysql.err.Warning)
def my_connect():
"""鏈接數據庫"""
global connection, con_cur
try:
connection = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_passwd,
charset=‘utf8mb4‘, port=mysql_port, autocommit=True, database=mysql_database)
except pymysql.err.MySQLError as err:
print(‘Error: ‘ + str(err))
exit(20)
con_cur = connection.cursor()
def create_table(t_name):
"""創建各站點對應的表"""
my_connect()
try:
con_cur.execute(creat_table.format(t_name))
except pymysql.err.Warning:
pass
def process_line(line_str):
"""
處理每一行記錄
line_str: 該行數據的字符串形式
"""
processed = log_pattern_obj.search(line_str)
if not processed:
‘‘‘如果正則根本就無法匹配該行記錄時‘‘‘
print("Can‘t process this line: {}".format(line_str))
return server, ‘‘, 0, ‘‘, 0, ‘‘, ‘‘, ‘‘, ‘‘, ‘‘, ‘‘
else:
# remote_addr (客戶若不經過代理,則可認為用戶的真實ip)
remote_addr = processed.group(‘remote_addr‘)
# time_local
time_local = processed.group(‘time_local‘)
# 轉換時間為mysql date類型
ori_time = time.strptime(time_local.split()[0], ‘%d/%b/%Y:%H:%M:%S‘)
new_time = time.strftime(‘%Y-%m-%d %H:%M:%S‘, ori_time)
# 處理uri和args
request = processed.group(‘request‘)
request_further = request_uri_pattern_obj.search(request)
if request_further:
request_method = request_further.group(‘request_method‘)
request_uri = request_further.group(‘request_uri‘)
uri_args = request_uri.split(‘?‘, 1)
# 對uri和args進行urldecode
uri = unquote(uri_args[0])
args = ‘‘ if len(uri_args) == 1 else unquote(uri_args[1])
# 對uri和args進行抽象化
uri_abs = text_abstract(uri, ‘uri‘)
args_abs = text_abstract(args, ‘args‘)
# 對庫裏的uri_abs和args_abs字段進行crc32校驗
uri_abs_crc32 = crc32(uri_abs.encode())
args_abs_crc32 = 0 if args_abs == ‘‘ else crc32(args_abs.encode())
else:
print(‘$request abnormal: {}‘.format(line_str))
request_method = ‘‘
uri = request
uri_abs = ‘‘
uri_abs_crc32 = 0
args = ‘‘
args_abs = ‘‘
args_abs_crc32 = 0
# 狀態碼,字節數,響應時間
response_code = processed.group(‘status‘)
bytes_sent = processed.group(‘body_bytes_sent‘)
request_time = processed.group(‘request_time‘)
# user_ip,cdn最後節點ip,以及是否經過F5
http_x_forwarded_for = processed.group(‘http_x_forwarded_for‘)
ips = http_x_forwarded_for.split()
# user_ip:用戶真實ip
# cdn_ip: CDN最後節點的ip,‘‘表示沒經過CDN;‘-‘表示沒經過CDN和F5
if http_x_forwarded_for == ‘-‘:
‘‘‘沒經過CDN和F5‘‘‘
user_ip = remote_addr
cdn_ip = ‘-‘
elif ips[0] == remote_addr:
‘‘‘沒經過CDN,經過F5‘‘‘
user_ip = remote_addr
cdn_ip = ‘‘
else:
‘‘‘經過CDN和F5‘‘‘
user_ip = ips[0].rstrip(‘,‘)
cdn_ip = ips[-1]
return (server, uri_abs, uri_abs_crc32, args_abs, args_abs_crc32, new_time, response_code, bytes_sent,
request_time, user_ip, cdn_ip, request_method, uri, args)
def text_abstract(text, what):
"""進一步處理uri和args,將其做抽象化,方便對其進行歸類
如uri: /article/10.html 抽象為 /article/?.html
如args: s=你好&type=0 抽象為 s=?&type=?
規則:待處理部分由[a-zA-Z\-_]組成的,則保留,其他情況值轉為‘?‘
"""
tmp_abs = ‘‘
if what == ‘uri‘:
uri_list = [tmp for tmp in text.split(‘/‘) if tmp != ‘‘]
if len(uri_list) == 0:
‘‘‘uri為"/"的情況‘‘‘
tmp_abs = ‘/‘
else:
for i in range(len(uri_list)):
if not re.match(r‘[a-zA-Z\-_]+?(\..*)?$‘, uri_list[i]):
‘‘‘uri不符合規則時,進行轉換‘‘‘
if ‘.‘ in uri_list[i]:
if not re.match(r‘[a-zA-Z\-_]+$‘, uri_list[i].split(‘.‘)[0]):
uri_list[i] = ‘?.‘ + uri_list[i].split(‘.‘)[1]
else:
uri_list[i] = ‘?‘
for v in uri_list:
tmp_abs += ‘/{}‘.format(v)
if text.endswith(‘/‘):
‘‘‘如果原uri後面有"/",要保留‘‘‘
tmp_abs += ‘/‘
elif what == ‘args‘:
if text == ‘‘:
tmp_abs = ‘‘
else:
try:
tmp_dict = OrderedDict((tmp.split(‘=‘) for tmp in text.split(‘&‘)))
for k, v in tmp_dict.items():
if not re.match(r‘[a-zA-Z\-_]+$‘, v):
‘‘‘除了value值為全字母的情況,都進行轉換‘‘‘
tmp_dict[k] = ‘?‘
for k, v in tmp_dict.items():
if tmp_abs == ‘‘:
tmp_abs += ‘{}={}‘.format(k, v)
else:
tmp_abs += ‘&{}={}‘.format(k, v)
except ValueError:
‘‘‘參數中沒有= 或者 即沒&也沒= 會拋出ValueError‘‘‘
tmp_abs = ‘?‘
return tmp_abs
def insert_data(line_data, cursor, results, limit, t_name, l_name):
"""
記錄處理之後的數據,累積limit條執行一次插入
line_data:每行處理之前的字符串數據;
limit:每limit行執行一次數據插入;
t_name:對應的表名;
l_name:日誌文件名
"""
line_result = process_line(line_data)
results.append(line_result)
# print(‘len(result):{}‘.format(len(result))) #debug
if len(results) == limit:
insert_correct(cursor, results, t_name, l_name)
results.clear()
print(‘{} {} 處理至 {}‘.format(time.strftime(‘%H:%M:%S‘, time.localtime()), l_name, line_result[5]))
def insert_correct(cursor, results, t_name, l_name):
"""在插入數據過程中處理異常"""
insert_sql = ‘insert into {} (server,uri_abs,uri_abs_crc32,args_abs,args_abs_crc32,time_local,response_code,‘ ‘bytes_sent,request_time,user_ip,cdn_ip,request_method,uri,args) ‘ ‘values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)‘.format(t_name)
try:
cursor.executemany(insert_sql, results)
except pymysql.err.Warning as err:
print(‘\n{} Warning: {}‘.format(l_name, err))
except pymysql.err.MySQLError as err:
print(‘\n{} Error: {}‘.format(l_name, err))
print(‘插入數據時出錯...\n‘)
connection.close()
exit(10)
def get_prev_num(t_name, l_name):
"""取得今天已入庫的行數 t_name:表名 l_name:日誌文件名"""
try:
con_cur.execute(‘select min(id) from {0} where time_local=(‘
‘select min(time_local) from {0} where time_local>="{1}")‘.format(t_name, today_start))
min_id = con_cur.fetchone()[0]
if min_id is not None: # 假如有今天的數據
con_cur.execute(‘select max(id) from {}‘.format(t_name))
max_id = con_cur.fetchone()[0]
con_cur.execute(
‘select count(*) from {} where id>={} and id<={} and server="{}"‘.format(t_name, min_id, max_id, server))
prev_num = con_cur.fetchone()[0]
else:
prev_num = 0
return prev_num
except pymysql.err.MySQLError as err:
print(‘Error: {}‘.format(err))
print(‘Error:未取得已入庫的行數,本次跳過{}\n‘.format(l_name))
return
def del_old_data(t_name, l_name, n=3):
"""刪除n天前的數據,n默認為3"""
# n天前的日期間
three_days_ago = time.strftime(‘%Y-%m-%d %H:%M:%S‘, time.localtime(time.time() - 3600 * 24 * n))
try:
con_cur.execute(‘select max(id) from {0} where time_local=(‘
‘select max(time_local) from {0} where time_local!="0000-00-00 00:00:00" and time_local<="{1}")‘.format(
t_name, three_days_ago))
max_id = con_cur.fetchone()[0]
if max_id is not None:
con_cur.execute(‘delete from {} where id<={}‘.format(t_name, max_id))
except pymysql.err.MySQLError as err:
print(‘\n{} Error: {}‘.format(l_name, err))
print(‘未能刪除表{}天前的數據...\n‘.format(n))
def main_loop(log_name):
"""主邏輯 log_name:日誌文件名"""
table_name = log_name.split(‘.access‘)[0].replace(‘.‘, ‘_‘) # 將域名例如m.api轉換成m_api,因為表名中不能包含‘.‘
results = []
# 創建表
create_table(table_name)
# 當前日誌文件總行數
num = int(subprocess.run(‘wc -l {}‘.format(log_dir + log_name), shell=True, stdout=subprocess.PIPE,
universal_newlines=True).stdout.split()[0])
print(‘num: {}‘.format(num)) # debug
# 上一次處理到的行數
prev_num = get_prev_num(table_name, log_name)
if prev_num is not None:
# 根據當前行數和上次處理之後記錄的行數對比,來決定本次要處理的行數範圍
i = 0
with open(log_name) as fp:
for line in fp:
i += 1
if i <= prev_num:
continue
elif prev_num < i <= num:
insert_data(line, con_cur, results, 1000, table_name, log_name)
else:
break
# 插入不足1000行的results
if len(results) > 0:
insert_correct(con_cur, results, table_name, log_name)
del_old_data(table_name, log_name)
if __name__ == "__main__":
# 檢測如果當前已經有該腳本在運行,則退出
if_run = subprocess.run(‘ps -ef|grep {}|grep -v grep|grep -v "/bin/sh"|wc -l‘.format(argv[0]), shell=True,
stdout=subprocess.PIPE).stdout
if if_run.decode().strip(‘\n‘) == ‘1‘:
os.chdir(log_dir)
logs_list = os.listdir(log_dir)
logs_list = [i for i in logs_list if ‘access‘ in i and os.path.isfile(i) and i.split(‘.access‘)[0] in todo]
if len(logs_list) > 0:
# 並行
with Pool(len(logs_list)) as p:
p.map(main_loop, logs_list)
最後按照我們期望的間隔設置計劃任務即可*/30 * * * * export LANG=zh_CN.UTF-8;python3 /root/log_analyse_parall.py &> /tmp/log_analyse.py3
關於這樣一個對不確定格式的大量文本進行分析的腳本來說,通用性和執行效率兩個因素非常重要。通用性上文中已大致說明了原理;性能方面,經筆者在一臺4核虛擬機上進行測試結果如下
# 4個日誌文件共80w(每個20w)行記錄,利用多進程並發處理,主進程派生出4個子進程來處理
# 處理時間
[ljk@git-svn ~]# time python3 shells/log_analyse_parall.py > /tmp/new_log.txt
real 0m24.057s
user 1m5.417s
sys 0m0.595s
# 4個進程平均每秒鐘處理約3.3w行數據
Python+MySQL實現web日誌分析