1. 程式人生 > 其它 >python多執行緒記憶體溢位--ThreadPoolExecutor記憶體溢位

python多執行緒記憶體溢位--ThreadPoolExecutor記憶體溢位

ThreadPoolExecutor記憶體溢位

情景一:
在資料處理中,使用ThreadPoolExecutor(執行緒池)處理大量資料情況下,導致記憶體溢位
機器卡死掛掉;
場景模擬:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from memory_profiler import profile
import queue
from line_profiler import LineProfiler
from functools import wraps


class BoundThreadPoolExecutor(ThreadPoolExecutor):
    """
    對ThreadPoolExecutor 進行重寫,給佇列設定邊界
    """
    def __init__(self, qsize: int = None, *args, **kwargs):
        super(BoundThreadPoolExecutor, self).__init__(*args, **kwargs)
        self._work_queue = queue.Queue(qsize)

def timer(func):
    @wraps(func)
    def decorator(*args, **kwargs):
        func_return = func(*args, **kwargs)
        lp = LineProfiler()
        lp_wrap = lp(func)
        lp_wrap(*args, **kwargs)
        lp.print_stats()
        return func_return

    return decorator

def func(num):
    print(f"the {num} run...")
    time.sleep(0.5)
    return num*num

# @timer
@profile
def main():
    # with ThreadPoolExecutor(max_workers=2) as t:
    #     res = [t.submit(func, i) for i in range(100)]
    # pool = BoundThreadPoolExecutor(qsize=2, max_workers=2)
    pool = ThreadPoolExecutor(max_workers=2)
    for i in range(100):
        # func(i)
        pool.submit(func, i)
        print(pool._work_queue.qsize())
    pool.shutdown()

if __name__ == '__main__':
    main()

未對執行緒佇列限制時,程序將所有物件新增到self._work_queue

重寫ThreadPoolExecutor, 限制self._work_queue = queue.Queue(qsize)佇列大小

結果對比

總結

存在記憶體溢位的情況,原因是ThreadPoolExecutor 執行緒池使用的是無邊界佇列,程序在佇列中
新增物件時沒有對空閒執行緒進行判斷,導致記憶體消耗過多