Python 將日誌資料儲存到 ElasticSearch 間隔指定時間
阿新 • • 發佈:2019-01-02
主要工作程式碼
import json
import os
import re
import time
import requests
import yaml
# host_ip = ""
def get_log_path_dict():
avira_log_path = "/home/xxx/logs/xxx"
for root, dirs, files in os.walk(avira_log_path):
log_path_dict = dict()
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
log_path = dir_path + "/xxx.log"
log_path_dict[dir_name] = log_path
return log_path_dict
def time_msg2timestamp(time_msg):
time_list = time.strptime(time_msg, "%Y-%m-%d %H:%M:%S")
timestamp = int(time.mktime(time_list))
return timestamp
def scan_log_path(dir_name, log_path):
with open(log_path, "r" , encoding="utf-8") as file_object:
log_list = re.split(r'\[INFO\]', file_object.read())[1:]
log_modify_time_pre = yaml_obj[dir_name]
log_modify_time_now = os.path.getmtime(log_path)
if log_modify_time_pre != log_modify_time_now:
delete_module_index(dir_name)
create_module_index(dir_name)
yaml_obj[dir_name] = log_modify_time_now
for log in log_list:
log = '[INFO]' + log
time_msg_pattern = re.compile(r'\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}')
time_msg = re.search(time_msg_pattern, log).group()
log_dict = dict()
log_dict["@timestamp"] = time_msg2timestamp(time_msg)
log_dict["log_msg"] = log
print(log_dict)
put_log_into_elasticsearch(dir_name, log_dict)
def put_log_into_elasticsearch(module_name, log_dict):
url = "http://127.0.0.1:9200/{}/log/".format(module_name.lower())
rep = session.post(url, json=log_dict)
print(rep.status_code)
print(rep.text)
def create_module_index(module_name):
url = "http://127.0.0.1:9200/{}".format(module_name.lower())
with open("./create_index.json", "r", encoding="utf-8") as file_object:
json_obj = json.load(file_object)
rep = session.put(url, json=json_obj)
print(rep.status_code)
print(rep.text)
def delete_module_index(module_name):
url = "http://127.0.0.1:9200/{}*".format(module_name.lower())
rep = session.delete(url)
print(rep.status_code)
print(rep.text)
if __name__ == "__main__":
session = requests.session()
while True:
# 讀取 yaml 檔案
with open("./last_query_log_time.yaml", "r", encoding="utf-8") as yaml_file:
yaml_obj = yaml.load(yaml_file.read())
# 遍歷各個模組目錄下的 log 檔案
for dir_name, log_path in get_log_path_dict().items():
# 掃描 log 檔案中的資訊是否有變化,如果有新增的日誌記錄,則 put 到 ElasticSearch 上
scan_log_path(dir_name, log_path)
# 更新 yaml 檔案中的資訊
with open("./last_query_log_time.yaml", "w") as yaml_file:
yaml.dump(yaml_obj, yaml_file)
time.sleep(10)
post 到 Es 資料的格式
{
"mappings": {
"log": {
"properties": {
"@timestamp": {
"type": "date",
"format": "epoch_second"
},
"log_msg": {
"type": "text",
"index": "true"
}
}
}
}
}
用於儲存日誌檔案的最後修改時間,
每次掃描的時候,跟上次修改時間進行比對,若未修改則不進行操作
{axxxx: 0, bxxxx: 0, bxxx: 0,
cxxxxxx: 0, dxxxxxr: 0, mxxxxxxx: 0}