1. 程式人生 > >Queue訊息佇列和Pool程序池

Queue訊息佇列和Pool程序池

目錄

Queue訊息佇列

在python中,多個程序之間是無法共享全域性變數的。但有時候我們又必須使用同一些資料。這時候就可以利用訊息佇列來實現。
Queue訊息佇列就相當於一個管道,資料從一頭進,另一頭出。
示例程式:

import multiprocessing
import time

# 建立訊息佇列,用來儲存資料
q = multiprocessing.Queue()


def put_in
(): for i in range(10): q.put(i) print(i, ' put in') time.sleep(0.1) def get_out(): for _ in range(10): print(q.get(), ' get out') pp = multiprocessing.Process(target=put_in) gp = multiprocessing.Process(target=get_out) pp.start() gp.start(
)

執行結果:

0   put in
0   get out
1   put in
1   get out
2   put in
2   get out
3   put in
3   get out
4   put in
4   get out
5   put in
5   get out
6   put in
6   get out
7   put in
7   get out
8   put in
8   get out
9   put in
9   get out

Queue常見方法

  • Queue.qsize():返回當前佇列包含的訊息數量;
  • Queue.empty():如果佇列為空,返回True,反之False ;
  • Queue.full():如果佇列滿了,返回True,反之False;
  • Queue.get():獲取佇列中的一條訊息,然後將其從列隊中移除,可傳參超時時長。
  • Queue.get_nowait():相當Queue.get(False),取不到值時觸發異常:Empty;
  • Queue.put():將一個值新增進數列,可傳參超時時長。
  • Queue.put_nowait():相當於Queue.get(False),當佇列滿了時報錯:Full

其中,Queue.empty()方法不可靠,容易出現錯誤判斷,與程式碼執行速度有關,因此可以用Queue.qsize()來判斷佇列是否為空。

Pool程序池

在你需要多程序的時候,又或是你不清楚需要多少程序合適的時候。我們可以利用程序池來協助建立程序。
程序池的建立需要呼叫multiprocessing.Pool(5)方法,其中括號裡傳遞的引數是最大可建立程序數量。如果沒有傳參,預設無限量。
程序池建立並呼叫程序程式碼如下:

import multiprocessing
import time


# 拷貝任務
def work():
    print("複製中...", multiprocessing.current_process().pid)
    time.sleep(0.5)

if __name__ == '__main__':
    # 建立程序池
    # 3:程序池中程序的最大個數
    pool = multiprocessing.Pool(3)
    # 模擬大批量的任務,讓程序池去執行
    for i in range(5):
        # 迴圈讓程序池執行對應的work任務
        # 同步執行任務,一個任務執行完成以後另外一個任務才能執行
        pool.apply(work)

建立程序池後,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務。
另外,在執行程序時候,選擇pool.apply()方法表示的是同步程序池。而我們一般用pool.apply_async(),這叫做非同步程序池。在使用非同步程序池的時候,我們應該在最後記得關閉程序池,用來告訴主程序不會再有更多的程序進來。另外還要加上程序池程序等待。讓主程序等待所有程序池程序執行完之後再退出。
示例程式碼:

# 程序池:池子裡面放的程序,程序池會根據任務執行情況自動建立程序,而且儘量少建立程序,合理利用程序池中的程序完成多工
import multiprocessing
import time


# 拷貝任務
def work():
    print("複製中...", multiprocessing.current_process().pid)
    # 獲取當前程序的守護狀態
    # 提示:使用程序池建立的程序是守護主程序的狀態,預設自己通過Process建立的程序是不是守住主程序的狀態
    # print(multiprocessing.current_process().daemon)
    time.sleep(0.5)

if __name__ == '__main__':
    # 建立程序池
    # 3:程序池中程序的最大個數
    pool = multiprocessing.Pool(3)
    # 模擬大批量的任務,讓程序池去執行
    for i in range(5):
        # 迴圈讓程序池執行對應的work任務
        # 同步執行任務,一個任務執行完成以後另外一個任務才能執行
        # pool.apply(work)
        # 非同步執行,任務執行不會等待,多個任務一起執行
        pool.apply_async(work)

    # 關閉程序池,意思告訴主程序以後不會有新的任務新增進來
    pool.close()
    # 主程序等待程序池執行完成以後程式再退出
    pool.join()