python 多程序下按天分割日誌handler。ConcurrentDayRotatingFileHandler
阿新 • • 發佈:2021-08-19
這個多程序切片安全的python按時間切割檔案。
官方的 TimedRotatingFileHandler 在多程序下瘋狂報錯,
不信的話可以試試官方 TimedRotatingFileHandler 多程序寫入檔案日誌,設定成每秒換一個新的檔案寫(主要是按天來切割要耽誤很長的時間才能觀察錯誤)
此日誌handler採用批量聚合每隔1秒寫入,在超高速寫入時候,寫入速度遠超官方。
import os import time from pathlib import Path import queue import re import atexit from threading import Lock, Thread反對極端面向過程程式設計思維方式,喜歡面向物件和設計模式的解讀,喜歡對比極端面向過程程式設計和oop程式設計消耗程式碼程式碼行數的區別和原因。致力於使用oop和36種設計模式寫出最高可複用的框架級程式碼和使用最少的程式碼行數完成任務,致力於使用oop和設計模式來使部分程式碼減少90%行,使絕大部分py檔案最低減少50%-80%行的寫法。from nb_filelock import FileLock import logging # noinspection PyUnresolvedReferences from logging import LogRecord, FileHandler # noinspection PyPep8Naming class ConcurrentDayRotatingFileHandler(logging.Handler): """ 這個多程序切片安全的。 官方的 TimedRotatingFileHandler 在多程序下瘋狂報錯, 不信的話可以試試官方 TimedRotatingFileHandler 多程序寫入檔案日誌,設定成每秒換一個新的檔案寫(主要是按天來切割要耽誤很長的時間才能觀察錯誤)""" file_handler_list = [] has_start_emit_all_file_handler_process_id_set = set() # 這個linux和windwos都相容,windwos是多程序每個程序的變數has_start_emit_all_file_handler是獨立的。linux是共享的。 __lock_for_rotate = Lock() @classmethod def _emit_all_file_handler(cls): while True: for hr incls.file_handler_list: # very_nb_print(hr.buffer_msgs_queue.qsize()) # noinspection PyProtectedMember hr._write_to_file() time.sleep(1) # 每隔一秒鐘批量寫入一次,效能好了很多。 @classmethod def _start_emit_all_file_handler(cls): pass Thread(target=cls._emit_all_file_handler, daemon=True).start() # noinspection PyMissingConstructor def __init__(self, file_name: str, file_path: str, back_count=10): super().__init__() self.file_name = file_name self.file_path = file_path self.backupCount = back_count self.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", re.ASCII) self.extMatch2 = re.compile(r"^\d{2}-\d{2}-\d{2}(\.\w+)?$", re.ASCII) self.buffer_msgs_queue = queue.Queue() atexit.register(self._write_to_file) # 如果程式屬於立馬就能結束的,需要在程式結束前執行這個鉤子,防止不到最後一秒的日誌沒記錄到。 self.file_handler_list.append(self) if os.getpid() not in self.has_start_emit_all_file_handler_process_id_set: self._start_emit_all_file_handler() self.__class__.has_start_emit_all_file_handler_process_id_set.add(os.getpid()) def emit(self, record: LogRecord): """ emit已經在logger的handle方法中加了鎖,所以這裡的重置上次寫入時間和清除buffer_msgs不需要加鎖了。 :param record: :return: """ # noinspection PyBroadException try: msg = self.format(record) self.buffer_msgs_queue.put(msg) except Exception: self.handleError(record) def _write_to_file(self): buffer_msgs = '' while True: try: msg = self.buffer_msgs_queue.get(block=False) buffer_msgs += msg + '\n' except queue.Empty: break if buffer_msgs: time_str = time.strftime('%Y-%m-%d') # time_str = time.strftime('%H-%M-%S') # 方便測試用的,方便觀察。 new_file_name = self.file_name + '.' + time_str path_obj = Path(self.file_path) / Path(new_file_name) path_obj.touch(exist_ok=True) with path_obj.open(mode='a') as f: f.write(buffer_msgs) with FileLock(self.file_path / Path(f'_delete_{self.file_name}.lock')): self._find_and_delete_files() def _find_and_delete_files(self): """ 這一段命名不規範是複製原來的官方舊程式碼。 Determine the files to delete when rolling over. More specific than the earlier method, which just used glob.glob(). """ dirName = self.file_path baseName = self.file_name fileNames = os.listdir(dirName) result = [] prefix = baseName + "." plen = len(prefix) for fileName in fileNames: if fileName[:plen] == prefix: suffix = fileName[plen:] # print(fileName, prefix,suffix) if self.extMatch.match(suffix) or self.extMatch2.match(suffix): result.append(os.path.join(dirName, fileName)) if len(result) < self.backupCount: result = [] else: result.sort() result = result[:len(result) - self.backupCount] # print(result) for r in result: Path(r).unlink() from concurrent.futures import ProcessPoolExecutor from logging.handlers import TimedRotatingFileHandler logger = logging.getLogger('lala') file_handler = ConcurrentDayRotatingFileHandler('test_my_cd.log', file_path='/pythonlogs') # file_handler = FileHandler('/pythonlogs/test_fhh.log') # 對比官方測試效能 # file_handler = TimedRotatingFileHandler('/pythonlogs/test_fhh.log') # 對比官方測試效能 file_handler.setFormatter(logging.Formatter('[%(asctime)s] - %(filename)s] - %(levelname)s: %(message)s')) logger.addHandler(file_handler) pool = ProcessPoolExecutor(10) def fun(x): for i in range(100000): # time.sleep(0.2) logger.warning(f"{x} {i}") if __name__ == '__main__': print('開始', time.strftime('%H_%M_%S')) for j in range(10): pool.submit(fun, j) pool.shutdown() print('結束', time.strftime('%H_%M_%S'))