python多執行緒記憶體溢位--ThreadPoolExecutor記憶體溢位
阿新 • • 發佈:2021-12-17
ThreadPoolExecutor記憶體溢位
情景一:
在資料處理中,使用ThreadPoolExecutor(執行緒池)處理大量資料情況下,導致記憶體溢位
機器卡死掛掉;
場景模擬:
from concurrent.futures import ThreadPoolExecutor, as_completed import time from memory_profiler import profile import queue from line_profiler import LineProfiler from functools import wraps class BoundThreadPoolExecutor(ThreadPoolExecutor): """ 對ThreadPoolExecutor 進行重寫,給佇列設定邊界 """ def __init__(self, qsize: int = None, *args, **kwargs): super(BoundThreadPoolExecutor, self).__init__(*args, **kwargs) self._work_queue = queue.Queue(qsize) def timer(func): @wraps(func) def decorator(*args, **kwargs): func_return = func(*args, **kwargs) lp = LineProfiler() lp_wrap = lp(func) lp_wrap(*args, **kwargs) lp.print_stats() return func_return return decorator def func(num): print(f"the {num} run...") time.sleep(0.5) return num*num # @timer @profile def main(): # with ThreadPoolExecutor(max_workers=2) as t: # res = [t.submit(func, i) for i in range(100)] # pool = BoundThreadPoolExecutor(qsize=2, max_workers=2) pool = ThreadPoolExecutor(max_workers=2) for i in range(100): # func(i) pool.submit(func, i) print(pool._work_queue.qsize()) pool.shutdown() if __name__ == '__main__': main()
未對執行緒佇列限制時,程序將所有物件新增到self._work_queue
中
重寫ThreadPoolExecutor
, 限制self._work_queue = queue.Queue(qsize)
佇列大小
結果對比
總結
存在記憶體溢位的情況,原因是ThreadPoolExecutor 執行緒池使用的是無邊界佇列,程序在佇列中
新增物件時沒有對空閒執行緒進行判斷,導致記憶體消耗過多