1. 程式人生 > 實用技巧 >非同步程序 multiprocessing 模板

非同步程序 multiprocessing 模板

`import os
from multiprocessing import Pool, Manager

def handle_task(arg1, arg2):
lock.acquire()
# get from queuing_task_lst
# enqueue running_task_dt
proc_id = os.getpid()
lock.release()

def async_deal_task_queue():
# query
queuing_task_lst.append(123)

def init(mgr_lock):
global lock
lock = mgr_lock

if name == 'main':
manager = Manager()
lock = manager.Lock()
queuing_task_lst = manager.list()
running_task_dt = manager.dict()

proc_pool = Pool(processes=10, initializer=init, initargs=(lock,))
for i in range(0, 10):
	try:
		proc_pool.apply_async(handle_task, args=(queuing_task_lst, running_task_dt))
	except KeyboardInterrupt as err:
		proc_pool.terminate()
	except Exception as e:
		proc_pool.terminate()
async_deal_task_queue()
try:
	proc_pool.close()
	proc_pool.join()
except Exception:
	pass
finally:
	pass`

說明:
上述程式碼中,handle_task程序和async_deal_task_queue程序非同步執行
async_deal_task_queue作為生產者,往全域性佇列 queuing_task_lst 放入生產的產品
handle_task 從 queuing_task_lst 中拿到產品進行消費
消費方式是一次性將生產的產品存多個,然後再一個一個進行處理
處理過程中使用了全域性鎖,通過 Pool的引數 initializer 及 initargs 來實現

細節技術:
manager.dict() 不同於一般意義上python 的 dict, 他的取法有如下特殊點:
取key只能用 run_task_dt.keys()
獲取value用 tid_lst = run_task_dt[key]
賦值能用 run_task_dt[queue_name] = value, 可變物件記得deepcopy
manager要使用 multiprocessing下的manager