python多程序下的生產者和消費者模型
一、生產者消費者模型介紹
1.1 為什麼需要使用生產者消費者模型
生產者是指生產資料的任務,消費者是指消費資料的任務。當生產者的生產能力遠大於消費者的消費能力,生產者就需要等消費者消費完才能繼續生產新的資料,同理,如果消費者的消費能力遠大於生產者的生產能力,消費者就需要等生產者生產完資料才能繼續消費,這種等待會造成效率的低下,為了解決這種問題就引入了生產者消費者模型。
1.2 如何實現生產者消費者模型
程序間引入佇列可以實現生產者消費者模型,通過使用佇列無需考慮鎖的概念,因為程序間的通訊是通過佇列來實現的;
生產者生產的資料往佇列裡面寫,消費者消費資料直接從佇列裡面取,這樣就對實現了生產者和消費者之間的解耦。
生產者 -- > 佇列 <--消費者
二、Queue實現生產者消費者模型
2.1 消費者生產者模型程式碼
from multiprocessing import Process,Queue import time # 消費者方法 def consumer(q,name): while True: res = q.get() # if res is None: break print("%s 吃了 %s" % (name,res)) # 生產者方法 def producer(q,name,food): for i in range(3): time.sleep(1) # 模擬生產西瓜的時間延遲 res = "%s %s" % (food,i) print("%s 生產了 %s" % (name,res)) # 把生產的vegetable放入到佇列中 q.put(res) if __name__ == "__main__": #建立佇列 q = Queue() # 建立生產者 p1 = Process(target=producer,args=(q,"kelly","西瓜")) c1 = Process(target=consumer,"peter",)) p1.start() c1.start() # p1.join() # q.put(None) print("主程序")
2.2 執行結果
2.2.1 直接執行上面的程式碼的結果
直接執行會出現一個問題就是生產者生產完了,沒有向消費者傳送一個停止的訊號,所以消費者一直會一直阻塞在q.get(),導致程式無法退出。
為了解決上面的問題,讓消費者消費完了生產者的資料之後自動退出,就需要在生產者程序介紹的時候往佇列裡面put一個結束訊號,消費者拿到這個訊號,就退出消費程序。
主要是兩個地方修改,把下方程式碼的註釋開啟就可以實現消費者消費完接收到生產者的結束訊號就退出消費者程序了。
def consumer(): if res is None: break if __name__ == "__main__": p1.join() q.put(None)
2.2.2 把註釋開啟後的執行結果
把註釋開啟後,消費者拿到了生產者傳送的結束訊號,可以正常退出程式了。
但如果有n個消費者,就需要傳送n個結束訊號,這種方式就不是那麼簡潔,像下面的程式碼這樣:
from multiprocessing import Process,Queue import time # 消費者方法 def consumer(q,name): while True: res = q.get() if res is None: break print("%s 吃了 %s" % (name,res)) # 生產者方法 def producer(q,res)) # 把生產的vegetable放入到佇列中 q.put(res) if __name__ == "__main__": # 建立佇列 q = Queue() # 建立生產者 p1 = Process(target=producer,"西瓜")) p2 = Process(target=producer,"kelly2","香蕉")) c1 = Process(target=consumer,)) c2 = Process(target=consumer,"peter2",)) c3 = Process(target=consumer,"peter3",)) p1.start() p2.start() c1.start() c2.start() c3.start() p1.join() p2.join() q.put(None) q.put(None) q.put(None) print("主程序")
其實我們現在就是生產者生產完資料之後想往佇列裡面傳送一個結束訊號,python語言提供了另外一種佇列JoinableQueue([maxsize])來解決這種問題
三、JoinableQueue實現生產者消費者模型
3.1 JoinableQueue方法介紹
JoinableQueue([maxsize]) : A queue type which also supports join() and task_done() methods
q.task_done():消費者使用此方法發出訊號,表示q.get()的返回專案已經被處理。
q.join():生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理;阻塞將持續到佇列中的每個專案均呼叫q.task_done()方法為止。
3.2 JoinableQueue實現生產者消費者模型原始碼
from multiprocessing import Process,JoinableQueue import time # 消費者方法 def consumer(q,res)) q.task_done() # 傳送訊號給q.join(),表示已經從佇列中取走一個值並處理完畢了 # 生產者方法 def producer(q,res)) # 把生產的vegetable放入到佇列中 q.put(res) q.join() # 等消費者把自己放入佇列的所有元素取完之後才結束 if __name__ == "__main__": # q = Queue() q = JoinableQueue() # 建立生產者 p1 = Process(target=producer,"藍莓")) # 建立消費者 c1 = Process(target=consumer,)) c1.daemon = True c2.daemon = True c3.daemon = True p_l = [p1,p2,c1,c2,c3] for p in p_l: p.start() p1.join() p2.join() # 1.主程序等待p1,p2程序結束才繼續執行 # 2.由於q.join()的存在,生產者只有等佇列中的元素被消費完才會結束 # 3.生產者結束了,就代表消費者已經消費完了,也可以結束了,所以可以把消費者設定為守護程序(隨著主程序的退出而退出) print("主程序")
3.3 執行結果
通過執行結果可以看出,生產者沒有手動傳送結束訊號給消費者,而是通過JoinableQueue佇列的方式也實現了生產者消費者模型。
到此這篇關於python多程序下的生產者和消費者模型的文章就介紹到這了,更多相關python多程序下的生產者和消費者內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!