非同步程序 multiprocessing 模板
`import os
from multiprocessing import Pool, Manager
def handle_task(arg1, arg2):
lock.acquire()
# get from queuing_task_lst
# enqueue running_task_dt
proc_id = os.getpid()
lock.release()
def async_deal_task_queue():
# query
queuing_task_lst.append(123)
def init(mgr_lock):
global lock
lock = mgr_lock
if name == 'main':
manager = Manager()
lock = manager.Lock()
queuing_task_lst = manager.list()
running_task_dt = manager.dict()
proc_pool = Pool(processes=10, initializer=init, initargs=(lock,)) for i in range(0, 10): try: proc_pool.apply_async(handle_task, args=(queuing_task_lst, running_task_dt)) except KeyboardInterrupt as err: proc_pool.terminate() except Exception as e: proc_pool.terminate() async_deal_task_queue() try: proc_pool.close() proc_pool.join() except Exception: pass finally: pass`
說明:
上述程式碼中,handle_task程序和async_deal_task_queue程序非同步執行
async_deal_task_queue作為生產者,往全域性佇列 queuing_task_lst 放入生產的產品
handle_task 從 queuing_task_lst 中拿到產品進行消費
消費方式是一次性將生產的產品存多個,然後再一個一個進行處理
處理過程中使用了全域性鎖,通過 Pool的引數 initializer 及 initargs 來實現
細節技術:
manager.dict() 不同於一般意義上python 的 dict, 他的取法有如下特殊點:
取key只能用 run_task_dt.keys()
獲取value用 tid_lst = run_task_dt[key]
賦值能用 run_task_dt[queue_name] = value, 可變物件記得deepcopy
manager要使用 multiprocessing下的manager