程序間通訊-Queue和程序池Pool
阿新 • • 發佈:2022-04-18
1.Queue的使用
初始化Queue()物件時(例如:q=Queue()),若括號中沒有指定最大可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭);
Queue.qsize():返回當前佇列包含的訊息數量;
Queue.empty():如果佇列為空,返回True,反之False ;
Queue.full():如果佇列滿了,返回True,反之False;
Queue.get([block[, timeout]]):獲取佇列中的一條訊息,然後將其從列隊中移除,block預設值為True;
1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀
到訊息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲" Queue.Empty"異常;
2)如果block值為False,訊息列隊如果為空,則會立刻丟擲"Queue.Empty"異常;
Queue.get_nowait():相當Queue.get(False);
Queue.put(item,[block[, timeout]]):將item訊息寫入佇列,block預設值為True;
1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),
直到從訊息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲"Queue.Full"異常;
2)如果block值為False,訊息列隊如果沒有空間可寫入,則會立刻丟擲"Queue.Full"異常;
Queue.put_nowait(item):相當Queue.put(item, False);
1 import multiprocessing
2
3
4 def down_from_web(q):
5 '''模擬從網上下載資料'''
6 # 模擬從網上下載資料
7 data = [11, 22, 33, 44]
8
9 # 向列隊中寫入資料
10 for temp in data:
11 q.put(temp)
12
13 print('下載器已經下載完啦並且存入到列隊中')
14
15
16 def analysis_data(q):
17 '''資料處理'''
18 waitting_analysis = list()
19 # 從列隊中獲取資料
20 while True:
21 data = q.get()
22 waitting_analysis.append(data)
23 if q.empty():
24 break
25
26 print(waitting_analysis)
27
28 def main():
29 # 1.建立一個列隊
30 q = multiprocessing.Queue()
31
32 # 2.建立多個程序,將佇列的引用當作實參進行傳遞到裡面
33 p1 = multiprocessing.Process(target=down_from_web, args=(q,))
34 p2 = multiprocessing.Process(target=analysis_data, args=(q,))
35 p1.start()
36 p2.start()
37
38
39 if __name__ == '__main__':
40 main()
2.程序池
當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動態成生多個程序,但如果是上百甚至上千個目標,手
動的去建立程序的工作量巨大,此時就可以用到multiprocessing模組提供的Pool方法。初始化Pool時,可以指定一個最大程序數,當有
新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼
該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,
multiprocessing.Pool常用函式解析:
apply_async(func[, args[, kwds]]) :使用非阻塞方式呼叫func(並行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),
args為傳遞給func的引數列表,kwds為傳遞給func的關鍵字引數列表;
close():關閉Pool,使其不再接受新的任務;
terminate():不管任務是否完成,立即終止;
join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;
1 # -*- coding:utf-8 -*-
2 from multiprocessing import Pool
3 import os, time, random
4
5
6 def worker(msg):
7 t_start = time.time()
8 print("%s開始執行,程序號為%d" % (msg, os.getpid()))
9 # random.random()隨機生成0~1之間的浮點數
10 time.sleep(random.random() * 2)
11 t_stop = time.time()
12 print(msg, "執行完畢,耗時%0.2f" % (t_stop - t_start))
13
14
15 if __name__ == '__main__':
16 po = Pool(3) # 定義一個程序池,最大程序數3
17 for i in range(0, 10):
18 # Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,))
19 # 每次迴圈將會用空閒出來的子程序去呼叫目標
20 po.apply_async(worker, (i,))
21
22 print("----start----")
23 po.close() # 關閉程序池,關閉後po不再接收新的請求
24 po.join() # 等待po中所有子程序執行完成,必須放在close語句之後
25 print("-----end-----")
3.程序池的Queue
如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:
RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的例項演示了程序池中的程序如何通訊:
1 # -*- coding:utf-8 -*-
2
3 # 修改import中的Queue為Manager
4 from multiprocessing import Manager, Pool
5 import os, time, random
6
7
8 def reader(q):
9 print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
10 for i in range(q.qsize()):
11 print("reader從Queue獲取到訊息:%s" % q.get(True))
12
13
14 def writer(q):
15 print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
16 for i in "itcast":
17 q.put(i)
18
19
20 if __name__ == "__main__":
21 print("(%s) start" % os.getpid())
22 q = Manager().Queue() # 使用Manager中的Queue
23 po = Pool()
24 po.apply_async(writer, (q,))
25
26 time.sleep(1) # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料
27
28 po.apply_async(reader, (q,))
29 po.close()
30 po.join()
31 print("(%s) End" % os.getpid())