1. 程式人生 > >Python 將日誌資料儲存到 ElasticSearch 間隔指定時間

Python 將日誌資料儲存到 ElasticSearch 間隔指定時間

主要工作程式碼

import json
import os
import re
import time

import requests
import yaml

# host_ip = ""

def get_log_path_dict():
    avira_log_path = "/home/xxx/logs/xxx"
    for root, dirs, files in os.walk(avira_log_path):
        log_path_dict = dict()
        for dir_name in dirs:
            dir_path = os.path.join(root, dir_name)
            log_path = dir_path + "/xxx.log"
log_path_dict[dir_name] = log_path return log_path_dict def time_msg2timestamp(time_msg): time_list = time.strptime(time_msg, "%Y-%m-%d %H:%M:%S") timestamp = int(time.mktime(time_list)) return timestamp def scan_log_path(dir_name, log_path): with open(log_path, "r"
, encoding="utf-8") as file_object: log_list = re.split(r'\[INFO\]', file_object.read())[1:] log_modify_time_pre = yaml_obj[dir_name] log_modify_time_now = os.path.getmtime(log_path) if log_modify_time_pre != log_modify_time_now: delete_module_index(dir_name) create_module_index(dir_name) yaml_obj[dir_name] = log_modify_time_now for
log in log_list: log = '[INFO]' + log time_msg_pattern = re.compile(r'\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}') time_msg = re.search(time_msg_pattern, log).group() log_dict = dict() log_dict["@timestamp"] = time_msg2timestamp(time_msg) log_dict["log_msg"] = log print(log_dict) put_log_into_elasticsearch(dir_name, log_dict) def put_log_into_elasticsearch(module_name, log_dict): url = "http://127.0.0.1:9200/{}/log/".format(module_name.lower()) rep = session.post(url, json=log_dict) print(rep.status_code) print(rep.text) def create_module_index(module_name): url = "http://127.0.0.1:9200/{}".format(module_name.lower()) with open("./create_index.json", "r", encoding="utf-8") as file_object: json_obj = json.load(file_object) rep = session.put(url, json=json_obj) print(rep.status_code) print(rep.text) def delete_module_index(module_name): url = "http://127.0.0.1:9200/{}*".format(module_name.lower()) rep = session.delete(url) print(rep.status_code) print(rep.text) if __name__ == "__main__": session = requests.session() while True: # 讀取 yaml 檔案 with open("./last_query_log_time.yaml", "r", encoding="utf-8") as yaml_file: yaml_obj = yaml.load(yaml_file.read()) # 遍歷各個模組目錄下的 log 檔案 for dir_name, log_path in get_log_path_dict().items(): # 掃描 log 檔案中的資訊是否有變化,如果有新增的日誌記錄,則 put 到 ElasticSearch 上 scan_log_path(dir_name, log_path) # 更新 yaml 檔案中的資訊 with open("./last_query_log_time.yaml", "w") as yaml_file: yaml.dump(yaml_obj, yaml_file) time.sleep(10)

post 到 Es 資料的格式

{
  "mappings": {
    "log": {
      "properties": {
        "@timestamp": {
          "type": "date",
          "format": "epoch_second"
        },
        "log_msg": {
          "type": "text",
          "index": "true"
        }
      }
    }
  }
}

用於儲存日誌檔案的最後修改時間,
每次掃描的時候,跟上次修改時間進行比對,若未修改則不進行操作

{axxxx: 0, bxxxx: 0, bxxx: 0,
  cxxxxxx: 0, dxxxxxr: 0, mxxxxxxx: 0}