1. 程式人生 > 其它 >多方式實現生產者與消費者

多方式實現生產者與消費者

生產者(producer)與消費者(consumer)

1.通過Queue佇列實現
from multiprocessing import Queue
import time
import random
from multiprocessing import Process, Queue


def producer(q: Queue, name: str, food: str):
    for i in range(1, 10 + 1):
        time.sleep(random.uniform(0.1, 0.8))
        print(f'{name}生產了{i}份{food}!')
        q.put((i, food))


def consumer(q: Queue, name: str):
    while True:
        res = q.get()
        if res is None:
            break
        time.sleep(random.uniform(0.1, 0.5))
        print(f'{name}吃了{res[0]}份{res[1]}!')


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q, '小日', '果凍'))
    p1.start()
    p2 = Process(target=producer, args=(q, '小月', '糖果'))
    p2.start()
    c = Process(target=consumer, args=(q, '小華'))
    c.start()
    p1.join()
    p2.join()
    q.put(None)
2.通過JoinableQueue實現
from multiprocessing import JoinableQueue
  • task_done()方法

    比原先佇列多了一個方法task_done(),當佇列中的資料處理完會反饋給佇列!

  • join()方法:

    阻塞方法,當佇列中的資料都task_done()完成後,join()就結束了,

    這樣可以讓生產者與消費者更完美的處理完資料!

import time
import random
from multiprocessing import Process, JoinableQueue


def producer(q, name: str, food: str):
    for i in range(1, 10 + 1):
        time.sleep(random.uniform(0.1, 0.8))
        print(f'{name}生產了{i}份{food}!')
        q.put((i, food))


def consumer(q, name: str):
    while True:
        res = q.get()
        time.sleep(random.uniform(0.1, 0.5))
        print(f'{name}吃了{res[0]}份{res[1]}!')
        q.task_done()


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=(q, 'big', '果凍'))
    p1.start()
    p2 = Process(target=producer, args=(q, 'small', '棒冰'))
    p2.start()
    c = Process(target=consumer, args=(q, '小華'))
    c.daemon = True
    c.start()
    p1.join() # p1和p2的阻塞是為了讓資料都生產結束,這樣佇列才能反饋處理結果
    p2.join()
    q.join()# 當佇列資料反饋都資料結束後,阻塞才會結束
3.通過管道Pipe實現
from multiprocessing import Pipe

在管道中有2端,在例題中我們取left,right,管道在子程序中傳遞了進去後,不使用的端我們要關閉,比如left,而在主程序中我們不使用right端,所以在left端傳送完資訊後,也要關閉right埠。
當管道中就剩下一個埠的時候,如果資料取完在繼續取資料,就會出現EOFError
我們通過對於異常的捕獲,讓消費者實現迴圈的關閉,從而實現管道之間的資料生產和消費。

注:管道是基於socket實現的所以收的時候會處於阻塞狀態

from multiprocessing import Pipe, Process
import time
import random


def producer(left, right, name, food):
    right.close()
    for i in range(20):
        time.sleep(random.uniform(0.1, 0.5))
        print(f'{name}生產了{i}份{food}')
        left.send((i, food))
    left.close()


def consumer(left, right, name):
    left.close()
    while True:
        try:
            res = right.recv()
            time.sleep(random.uniform(0.1, 0.5))
            print(f'{name}吃了{res[0]}份{res[1]}')
        except EOFError:
            break


if __name__ == '__main__':
    left, right = Pipe()
    p1 = Process(target=producer, args=(left, right, 'Tom', '水果'))
    p1.start()
    c1 = Process(target=consumer, args=(left, right, '小明'))
    c1.start()
    p1.join()
    left.close()
    right.close()
    c1.join()
    print('主程序結束')