1. 程式人生 > 其它 >程序間通訊-Queue和程序池Pool

程序間通訊-Queue和程序池Pool

1.Queue的使用

初始化Queue()物件時(例如:q=Queue()),若括號中沒有指定最大可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭);

Queue.qsize():返回當前佇列包含的訊息數量;

Queue.empty():如果佇列為空,返回True,反之False ;

Queue.full():如果佇列滿了,返回True,反之False;

Queue.get([block[, timeout]]):獲取佇列中的一條訊息,然後將其從列隊中移除,block預設值為True;

1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀

到訊息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲"
Queue.Empty"異常; 2)如果block值為False,訊息列隊如果為空,則會立刻丟擲"Queue.Empty"異常; Queue.get_nowait():相當Queue.get(False); Queue.put(item,[block[, timeout]]):將item訊息寫入佇列,block預設值為True; 1)如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),

直到從訊息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲"Queue.Full"異常;
2)如果block值為False,訊息列隊如果沒有空間可寫入,則會立刻丟擲"Queue.Full"異常; Queue.put_nowait(item):相當Queue.put(item, False);

 

 

 

 1 import multiprocessing
 2 
 3 
 4 def down_from_web(q):
 5     '''模擬從網上下載資料'''
 6     # 模擬從網上下載資料
 7     data = [11, 22, 33, 44]
 8 
 9     # 向列隊中寫入資料
10     for temp in data:
11         q.put(temp)
12 13 print('下載器已經下載完啦並且存入到列隊中') 14 15 16 def analysis_data(q): 17 '''資料處理''' 18 waitting_analysis = list() 19 # 從列隊中獲取資料 20 while True: 21 data = q.get() 22 waitting_analysis.append(data) 23 if q.empty(): 24 break 25 26 print(waitting_analysis) 27 28 def main(): 29 # 1.建立一個列隊 30 q = multiprocessing.Queue() 31 32 # 2.建立多個程序,將佇列的引用當作實參進行傳遞到裡面 33 p1 = multiprocessing.Process(target=down_from_web, args=(q,)) 34 p2 = multiprocessing.Process(target=analysis_data, args=(q,)) 35 p1.start() 36 p2.start() 37 38 39 if __name__ == '__main__': 40 main()

 

2.程序池

  當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動態成生多個程序,但如果是上百甚至上千個目標,手

動的去建立程序的工作量巨大,此時就可以用到multiprocessing模組提供的Pool方法。初始化Pool時,可以指定一個最大程序數,當有

新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼

該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,
multiprocessing.Pool常用函式解析:

apply_async(func[, args[, kwds]]) :使用非阻塞方式呼叫func(並行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),
args為傳遞給func的引數列表,kwds為傳遞給func的關鍵字引數列表;
close():關閉Pool,使其不再接受新的任務;
terminate():不管任務是否完成,立即終止;
join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;
 1 # -*- coding:utf-8 -*-
 2 from multiprocessing import Pool
 3 import os, time, random
 4 
 5 
 6 def worker(msg):
 7     t_start = time.time()
 8     print("%s開始執行,程序號為%d" % (msg, os.getpid()))
 9     # random.random()隨機生成0~1之間的浮點數
10     time.sleep(random.random() * 2)
11     t_stop = time.time()
12     print(msg, "執行完畢,耗時%0.2f" % (t_stop - t_start))
13 
14 
15 if __name__ == '__main__':
16     po = Pool(3)  # 定義一個程序池,最大程序數3
17     for i in range(0, 10):
18         # Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,))
19         # 每次迴圈將會用空閒出來的子程序去呼叫目標
20         po.apply_async(worker, (i,))
21 
22     print("----start----")
23     po.close()  # 關閉程序池,關閉後po不再接收新的請求
24     po.join()  # 等待po中所有子程序執行完成,必須放在close語句之後
25     print("-----end-----")

 

3.程序池的Queue

如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的例項演示了程序池中的程序如何通訊:
 1 # -*- coding:utf-8 -*-
 2 
 3 # 修改import中的Queue為Manager
 4 from multiprocessing import Manager, Pool
 5 import os, time, random
 6 
 7 
 8 def reader(q):
 9     print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
10     for i in range(q.qsize()):
11         print("reader從Queue獲取到訊息:%s" % q.get(True))
12 
13 
14 def writer(q):
15     print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
16     for i in "itcast":
17         q.put(i)
18 
19 
20 if __name__ == "__main__":
21     print("(%s) start" % os.getpid())
22     q = Manager().Queue()  # 使用Manager中的Queue
23     po = Pool()
24     po.apply_async(writer, (q,))
25 
26     time.sleep(1)  # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料
27 
28     po.apply_async(reader, (q,))
29     po.close()
30     po.join()
31     print("(%s) End" % os.getpid())