多方式實現生產者與消費者
阿新 • • 發佈:2021-08-10
生產者(producer)與消費者(consumer)
1.通過Queue佇列實現
from multiprocessing import Queue
import time import random from multiprocessing import Process, Queue def producer(q: Queue, name: str, food: str): for i in range(1, 10 + 1): time.sleep(random.uniform(0.1, 0.8)) print(f'{name}生產了{i}份{food}!') q.put((i, food)) def consumer(q: Queue, name: str): while True: res = q.get() if res is None: break time.sleep(random.uniform(0.1, 0.5)) print(f'{name}吃了{res[0]}份{res[1]}!') if __name__ == '__main__': q = Queue() p1 = Process(target=producer, args=(q, '小日', '果凍')) p1.start() p2 = Process(target=producer, args=(q, '小月', '糖果')) p2.start() c = Process(target=consumer, args=(q, '小華')) c.start() p1.join() p2.join() q.put(None)
2.通過JoinableQueue實現
from multiprocessing import JoinableQueue
-
task_done()方法
比原先佇列多了一個方法task_done(),當佇列中的資料處理完會反饋給佇列!
-
join()方法:
阻塞方法,當佇列中的資料都task_done()完成後,join()就結束了,
這樣可以讓生產者與消費者更完美的處理完資料!
import time import random from multiprocessing import Process, JoinableQueue def producer(q, name: str, food: str): for i in range(1, 10 + 1): time.sleep(random.uniform(0.1, 0.8)) print(f'{name}生產了{i}份{food}!') q.put((i, food)) def consumer(q, name: str): while True: res = q.get() time.sleep(random.uniform(0.1, 0.5)) print(f'{name}吃了{res[0]}份{res[1]}!') q.task_done() if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=(q, 'big', '果凍')) p1.start() p2 = Process(target=producer, args=(q, 'small', '棒冰')) p2.start() c = Process(target=consumer, args=(q, '小華')) c.daemon = True c.start() p1.join() # p1和p2的阻塞是為了讓資料都生產結束,這樣佇列才能反饋處理結果 p2.join() q.join()# 當佇列資料反饋都資料結束後,阻塞才會結束
3.通過管道Pipe實現
from multiprocessing import Pipe
在管道中有2端,在例題中我們取left,right,管道在子程序中傳遞了進去後,不使用的端我們要關閉,比如left,而在主程序中我們不使用right端,所以在left端傳送完資訊後,也要關閉right埠。
當管道中就剩下一個埠的時候,如果資料取完在繼續取資料,就會出現EOFError
我們通過對於異常的捕獲,讓消費者實現迴圈的關閉,從而實現管道之間的資料生產和消費。注:管道是基於socket實現的所以收的時候會處於阻塞狀態
from multiprocessing import Pipe, Process import time import random def producer(left, right, name, food): right.close() for i in range(20): time.sleep(random.uniform(0.1, 0.5)) print(f'{name}生產了{i}份{food}') left.send((i, food)) left.close() def consumer(left, right, name): left.close() while True: try: res = right.recv() time.sleep(random.uniform(0.1, 0.5)) print(f'{name}吃了{res[0]}份{res[1]}') except EOFError: break if __name__ == '__main__': left, right = Pipe() p1 = Process(target=producer, args=(left, right, 'Tom', '水果')) p1.start() c1 = Process(target=consumer, args=(left, right, '小明')) c1.start() p1.join() left.close() right.close() c1.join() print('主程序結束')