Queue訊息佇列和Pool程序池
阿新 • • 發佈:2018-11-23
目錄
Queue訊息佇列
在python中,多個程序之間是無法共享全域性變數的。但有時候我們又必須使用同一些資料。這時候就可以利用訊息佇列來實現。
Queue訊息佇列就相當於一個管道,資料從一頭進,另一頭出。
示例程式:
import multiprocessing
import time
# 建立訊息佇列,用來儲存資料
q = multiprocessing.Queue()
def put_in ():
for i in range(10):
q.put(i)
print(i, ' put in')
time.sleep(0.1)
def get_out():
for _ in range(10):
print(q.get(), ' get out')
pp = multiprocessing.Process(target=put_in)
gp = multiprocessing.Process(target=get_out)
pp.start()
gp.start( )
執行結果:
0 put in
0 get out
1 put in
1 get out
2 put in
2 get out
3 put in
3 get out
4 put in
4 get out
5 put in
5 get out
6 put in
6 get out
7 put in
7 get out
8 put in
8 get out
9 put in
9 get out
Queue常見方法
- Queue.qsize():返回當前佇列包含的訊息數量;
- Queue.empty():如果佇列為空,返回True,反之False ;
- Queue.full():如果佇列滿了,返回True,反之False;
- Queue.get():獲取佇列中的一條訊息,然後將其從列隊中移除,可傳參超時時長。
- Queue.get_nowait():相當Queue.get(False),取不到值時觸發異常:Empty;
- Queue.put():將一個值新增進數列,可傳參超時時長。
- Queue.put_nowait():相當於Queue.get(False),當佇列滿了時報錯:Full
其中,
Queue.empty()
方法不可靠,容易出現錯誤判斷,與程式碼執行速度有關,因此可以用Queue.qsize()
來判斷佇列是否為空。
Pool程序池
在你需要多程序的時候,又或是你不清楚需要多少程序合適的時候。我們可以利用程序池來協助建立程序。
程序池的建立需要呼叫multiprocessing.Pool(5)
方法,其中括號裡傳遞的引數是最大可建立程序數量。如果沒有傳參,預設無限量。
程序池建立並呼叫程序程式碼如下:
import multiprocessing
import time
# 拷貝任務
def work():
print("複製中...", multiprocessing.current_process().pid)
time.sleep(0.5)
if __name__ == '__main__':
# 建立程序池
# 3:程序池中程序的最大個數
pool = multiprocessing.Pool(3)
# 模擬大批量的任務,讓程序池去執行
for i in range(5):
# 迴圈讓程序池執行對應的work任務
# 同步執行任務,一個任務執行完成以後另外一個任務才能執行
pool.apply(work)
建立程序池後,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務。
另外,在執行程序時候,選擇pool.apply()
方法表示的是同步程序池。而我們一般用pool.apply_async()
,這叫做非同步程序池。在使用非同步程序池的時候,我們應該在最後記得關閉程序池,用來告訴主程序不會再有更多的程序進來。另外還要加上程序池程序等待。讓主程序等待所有程序池程序執行完之後再退出。
示例程式碼:
# 程序池:池子裡面放的程序,程序池會根據任務執行情況自動建立程序,而且儘量少建立程序,合理利用程序池中的程序完成多工
import multiprocessing
import time
# 拷貝任務
def work():
print("複製中...", multiprocessing.current_process().pid)
# 獲取當前程序的守護狀態
# 提示:使用程序池建立的程序是守護主程序的狀態,預設自己通過Process建立的程序是不是守住主程序的狀態
# print(multiprocessing.current_process().daemon)
time.sleep(0.5)
if __name__ == '__main__':
# 建立程序池
# 3:程序池中程序的最大個數
pool = multiprocessing.Pool(3)
# 模擬大批量的任務,讓程序池去執行
for i in range(5):
# 迴圈讓程序池執行對應的work任務
# 同步執行任務,一個任務執行完成以後另外一個任務才能執行
# pool.apply(work)
# 非同步執行,任務執行不會等待,多個任務一起執行
pool.apply_async(work)
# 關閉程序池,意思告訴主程序以後不會有新的任務新增進來
pool.close()
# 主程序等待程序池執行完成以後程式再退出
pool.join()