生產者消費者模型(重要)
生產者消費者模型介紹
生產者指的是生產資料的任務,消費者指的是處理資料的任務。
在併發程式設計中,如果生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題,引入了生產者和消費者模式。
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊。所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。
這個阻塞佇列就是用來給生產者和消費者解耦的
生產者消費者模型實現
from multiprocessing import Process,Queue import time def producer(q): for i in range(5): res = "包子%s" % i time.sleep(0.5) print("生產者生產了%s" % res) q.put(res) def consumer(q): while True: res = q.get() time.sleep(1) print("消費者吃了%s" % res) if __name__ == "__main__": # 容器 q = Queue() # 生產者們 p1 = Process(target=producer, args=(q, )) # 消費者們 c1 = Process(target=consumer, args=(q, )) p1.start() c1.start() p1.join() # 保證生產者已經生產完 print("主程序")
執行結果
生產者生產了包子0
生產者生產了包子1
消費者吃了包子0
生產者生產了包子2
生產者生產了包子3
消費者吃了包子1
生產者生產了包子4
主程序
消費者吃了包子2
消費者吃了包子3
消費者吃了包子4
此時的問題是主程序永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死迴圈中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往佇列中再發一個結束訊號,這樣消費者在接收到結束訊號後就可以break出死迴圈
from multiprocessing import Process,Queue import time def producer(q): for i in range(5): res = "包子%s" % i time.sleep(0.5) print("生產者生產了%s" % res) q.put(res) def consumer(q): while True: res = q.get() if res is None: break time.sleep(1) print("消費者吃了%s" % res) if __name__ == "__main__": # 容器 q = Queue() # 生產者們 p1 = Process(target=producer, args=(q, )) # 消費者們 c1 = Process(target=consumer, args=(q, )) p1.start() c1.start() p1.join() # 保證生產者已經生產完 q.put(None) print("主程序")
但上述解決方式,在有多個生產者和多個消費者時,我們則需要用一個很low的方式去解決,有幾個消費者就需要傳送幾次結束訊號:相當low,例如
from multiprocessing import Process,Queue import time def producer(q): for i in range(5): res = "包子%s" % i time.sleep(0.5) print("生產者生產了%s" % res) q.put(res) def consumer(q): while True: res = q.get() if res is None: break time.sleep(1) print("消費者吃了%s" % res) if __name__ == "__main__": # 容器 q = Queue() # 生產者們 p1 = Process(target=producer, args=(q, )) p2 = Process(target=producer, args=(q,)) p3 = Process(target=producer, args=(q,)) # 消費者們 c1 = Process(target=consumer, args=(q, )) c2 = Process(target=consumer, args=(q,)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() # 保證生產者已經生產完 p2.join() p3.join() q.put(None) q.put(None) print("主程序")
其實我們的思路無非是傳送結束訊號而已,有另外一種佇列提供了這種機制
JoinableQueue([maxsize])
這就像是一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理。通知程序是使用共享的訊號和條件變數來實現的。
引數介紹
maxsize是佇列中允許最大項數,省略則無大小限制。
方法介紹
JoinableQueue的例項p除了與Queue物件相同的方法之外還具有:
q.task_done():使用者使用此方法發出訊號,表示q.get()的返回專案已經被處理。如果呼叫此方法的次數大於從佇列中刪除專案的數量,將引發ValueError異常
q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理。阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止
基於JoinableQueue實現生產者消費者模型
from multiprocessing import Process, JoinableQueue import time def producer(q): for i in range(3): res = "包子%s" % i time.sleep(0.5) print("生產者生產了%s" % res) q.put(res) q.join() # 等到消費者把自己放入佇列中的所有的資料都取走之後,生產者才結束 def consumer(q): while True: res = q.get() if res is None: break time.sleep(1) print("消費者吃了%s" % res) q.task_done() # 傳送訊號給q.join(),說明已經從佇列中取走一個數據並處理完畢了 if __name__ == "__main__": # 容器 q = JoinableQueue() # 生產者們 p1 = Process(target=producer, args=(q, )) p2 = Process(target=producer, args=(q,)) p3 = Process(target=producer, args=(q,)) # 消費者們 c1 = Process(target=consumer, args=(q, )) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() print("主程序") # 執行完了,消費者也該結束了 # 1、主程序等生產者p1、p2、p3結束 # 2、而p1、p2、p3是在消費者把所有資料都取乾淨之後才會結束 # 3、所以一旦p1、p2、p3結束了,證明消費者也沒必要存在了,應該隨著主程序一塊死掉,因而需要將生產者們設定成守護程序
生產者消費者模型總結
1、程式中有兩類角色
一類負責生產資料(生產者)
一類負責處理資料(消費者)
2、引入生產者消費者模型為了解決的問題是(好處)
平衡生產者與消費者之間的速度差
程式解開耦合
3、如何實現生產者消費者模型
生產者<--->佇列<--->消費者