Python 生產者消費者模式
生產者消費者模式
在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題,
該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度
生產者和消費者模式來源
在線程世界裏, 生產者就是生產數據的線程,消費者就是消費數據的線程。
在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,
那麽生產者就必須等待消費者處理完,才能繼續生產數據。
同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。
為了解決這個問題於是引用了生產者和消費者模式。
生產者消費者模式詳解
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。
生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,
直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,
平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模式
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。 q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常. q.get_nowait():同q.get(False) q.put_nowait():同q.put(False) q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。 q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。 q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
1 from multiprocessing import Process, Queue 2 import time, random, os 3 4 5 def consumer(q): 6 while True: 7 res = q.get() 8 if res is None: break # 收到結束信號則結束 9 time.sleep(random.randint(1, 3)) 10 print(‘\033[45m%s 吃 %s\033[0m‘ % (os.getpid(), res))11 12 13 def producer(name, q): 14 for i in range(2): 15 time.sleep(random.randint(1, 3)) 16 res = ‘%s%s‘ % (name, i) 17 q.put(res) 18 print(‘\033[44m%s 生產了 %s\033[0m‘ % (os.getpid(), res)) 19 20 21 if __name__ == ‘__main__‘: 22 q = Queue() 23 # 生產者們:即廚師們 24 p1 = Process(target=producer, args=(‘包子‘, q)) 25 p2 = Process(target=producer, args=(‘骨頭‘, q)) 26 p3 = Process(target=producer, args=(‘泔水‘, q)) 27 28 # 消費者們:即吃貨們 29 c1 = Process(target=consumer, args=(q,)) 30 c2 = Process(target=consumer, args=(q,)) 31 32 # 開始 33 p1.start() 34 p2.start() 35 p3.start() 36 c1.start() 37 38 p1.join() # 必須保證生產者全部生產完畢,才應該發送結束信號 39 p2.join() 40 p3.join() 41 q.put(None) # 有幾個消費者就應該發送幾次結束信號None 42 q.put(None) # 發送結束信號 43 print(‘主‘) 44 #有幾個消費者就需要發送幾次結束信號:相當low
另一種隊列也提供了這種機制
#JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 #參數介紹: maxsize是隊列中允許最大項數,省略則無大小限制。 #方法介紹: JoinableQueue的實例p除了與Queue對象相同的方法之外還具有: q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。如果調用此方法的次數大於從隊列中刪除項目的數量,將引發ValueError異常 q.join():生產者調用此方法進行阻塞,直到隊列中所有的項目均被處理。阻塞將持續到隊列中的每個項目均調用q.task_done()方法為止
1 from multiprocessing import Process,JoinableQueue 2 import time,random,os 3 def consumer(q): 4 while True: 5 res=q.get() 6 time.sleep(random.randint(1,3)) 7 print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res)) 8 9 q.task_done() #向q.join()發送一次信號,證明一個數據已經被取走了 10 11 def producer(name,q): 12 for i in range(10): 13 time.sleep(random.randint(1,3)) 14 res=‘%s%s‘ %(name,i) 15 q.put(res) 16 print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res)) 17 q.join() 18 19 20 if __name__ == ‘__main__‘: 21 q=JoinableQueue() 22 #生產者們:即廚師們 23 p1=Process(target=producer,args=(‘包子‘,q)) 24 p2=Process(target=producer,args=(‘骨頭‘,q)) 25 p3=Process(target=producer,args=(‘泔水‘,q)) 26 27 #消費者們:即吃貨們 28 c1=Process(target=consumer,args=(q,)) 29 c2=Process(target=consumer,args=(q,)) 30 c1.daemon=True 31 c2.daemon=True 32 33 #開始 34 p_l=[p1,p2,p3,c1,c2] 35 for p in p_l: 36 p.start() 37 38 p1.join() 39 p2.join() 40 p3.join() 41 print(‘主‘) 42 43 #主進程等--->p1,p2,p3等---->c1,c2 44 #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的數據 45 #因而c1,c2也沒有存在的價值了,應該隨著主進程的結束而結束,所以設置成守護進程
生產者消費者模式總結
程序中有兩類角色:
一類負責生產數據(生產者)
一類負責處理數據(消費者)
引入生產者消費者模式為了解決的問題是:
平衡生產者與消費者之間的工作能力,從而提高程序整體處理數據的速度
如何實現:
生產者 -------- 隊列 -------- 消費者
生產者消費者模式實現程序的解耦合
管道(了解)
#創建管道的類: Pipe([duplex]):在進程之間創建一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強調一點:必須在產生Process對象之前產生管道 #參數介紹: dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那麽recv方法會拋出EOFError。 conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象 #其他方法: conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回連接使用的整數文件描述符 conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。 介紹
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def consumer(p,name): 5 left,right=p 6 left.close() 7 while True: 8 try: 9 baozi=right.recv() 10 print(‘%s 收到包子:%s‘ %(name,baozi)) 11 except EOFError: 12 right.close() 13 break 14 def producer(seq,p): 15 left,right=p 16 right.close() 17 for i in seq: 18 left.send(i) 19 # time.sleep(1) 20 else: 21 left.close() 22 if __name__ == ‘__main__‘: 23 left,right=Pipe() 24 25 c1=Process(target=consumer,args=((left,right),‘c1‘)) 26 c1.start() 27 28 29 seq=(i for i in range(10)) 30 producer(seq,(left,right)) 31 32 right.close() 33 left.close() 34 35 c1.join() 36 print(‘主進程‘) 37 38 基於管道實現進程間通信(與隊列的方式是類似的,隊列就是管道加鎖實現的)
註意:
生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。
如果忘記執行這些步驟,程序可能在消費者中的recv()操作上掛起。管道是由操作系統進行引用計算的,必須在所有進程中關閉管道後才能產生EOFError異常。因此在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。
1 from multiprocessing import Process,Pipe 2 3 import time,os 4 def adder(p,name): 5 server,client=p 6 client.close() 7 while True: 8 try: 9 x,y=server.recv() 10 except EOFError: 11 server.close() 12 break 13 res=x+y 14 server.send(res) 15 print(‘server done‘) 16 if __name__ == ‘__main__‘: 17 server,client=Pipe() 18 19 c1=Process(target=adder,args=((server,client),‘c1‘)) 20 c1.start() 21 22 server.close() 23 24 client.send((10,20)) 25 print(client.recv()) 26 client.close() 27 28 c1.join() 29 print(‘主進程‘) 30 #註意:send()和recv()方法使用pickle模塊對對象進行序列化。 31 32 管道可以用於雙向通信,利用通常在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可以使用管道編寫與進程交互的程序
Python 生產者消費者模式