1. 程式人生 > >程序間通訊(佇列、管道)、消費者模型和程序池(apply,apply_async,map)

程序間通訊(佇列、管道)、消費者模型和程序池(apply,apply_async,map)

複製程式碼
一、佇列(先進先出)
程序間通訊:IPC(Inter-Process Communication)

佇列是使用管道和鎖定實現,所以Queue是多程序安全的佇列,使用Queue可以實現多程序之間的資料傳遞。 


1、Queue([maxsize]) 
建立共享的程序佇列。maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。

Queue的例項q具有以下方法:
q.get( [ block [ ,timeout ] ] ) 
返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True阻塞. 如果設定為False,丟擲queue.Empty異常(定義在queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發queue.Empty異常。

q.get_nowait( ) 
相當於q.get(False)方法,沒有取到值也不會阻塞,直接丟擲queue.Empty異常。

q.put(item [, block [,timeout ] ] ) 
將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True阻塞。如果設定為False,將引發queue.Full異常(定義在queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發queue.Full異常。

q.put_nowait()
相當於q.put(item,False)方法,佇列已滿,不會阻塞,直接丟擲queue.Full異常。

q.qsize() 
返回佇列中目前專案的正確數量。此函式的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。

q.empty() 
如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案。

q.full() 
如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。


其他方法(不常用):
q.close() 
關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()方法阻塞。

q.join_thread() 
連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()方法可以禁止這種行為。



2、方法使用例子: from multiprocessing import Queue import queue (1) q = Queue(2) q.put(1) q.put(2) print(q.get()) # 1 print(q.get()) # 2 (2) q = Queue(2) q.put(1) q.put(2) q.put(3) # 一直阻塞在這裡 print(q.get()) print(q.get()) (3) q = Queue(2) q.put(1) q.put(2) print(q.get()) #
1 print(q.get()) # 2 print(q.get()) # 一直阻塞在這裡 (4) q = Queue(2) q.put(1) q.put(2) q.put_nowait(3) # 異常:queue.Full # q.put(3,False) # 異常:queue.Full # q.put(3,timeout=2) # 2秒後異常:queue.Full (5) q = Queue(2) q.put(1) q.put(2) print(q.get()) # 1 print(q.get()) # 2 print(q.get_nowait()) #
異常:queue.Empty # print(q.get(False)) # 異常:queue.Empty # print(q.get(timeout=2)) # 2秒後異常:queue.Empty 3、小例子 from multiprocessing import Process,Queue def consume(q): print('子程序:',q.get()) q.put('hi') if __name__ == '__main__': q = Queue() p = Process(target=consume,args=(q,)) p.start() q.put('hello') p.join() print('主程序:',q.get()) 4、生產者消費者模型 場景: 為什麼要使用生產者和消費者模式 線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢, 那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。 什麼是生產者消費者模式 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊, 所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。 import time import random from multiprocessing import Process,Queue def consumer(q,name): # 消費者 while True: food = q.get() # 在佇列中取值 if food is None:break time.sleep(random.uniform(0.3,1)) # 模擬吃消耗的時間 print('%s偷吃了%s,快打死他' %(name,food)) def producter(q,name,food): # 生產者 for i in range(10): time.sleep(random.uniform(0.5,0.9)) # 模擬生產時間 print('%s生產了%s,序號:%s' %(name,food,i)) q.put(food+str(i)) # 把值存入佇列中 if __name__ == '__main__': q = Queue() # Queue佇列物件 c1 = Process(target=consumer,args=(q,'小明')) c2 = Process(target=consumer,args=(q,'小東')) c1.start() c2.start() p1 = Process(target=producter,args=(q,'張三','麵包')) p2 = Process(target=producter,args=(q,'李四','可樂')) p1.start() p2.start() p1.join() p2.join() q.put(None) # 有幾個consumer程序就需要放幾個None,表示生產完畢(這就有點low了) q.put(None) 二、JoinableQueue JoinableQueue和Queue幾乎一樣,不同的是JoinableQueue佇列允許使用者告訴佇列某個資料已經處理了。通知程序是使用共享的訊號和條件變數來實現的。 task_done():使用者使用此方法發出訊號,表示q.get()返回的專案已經被處理 join():當佇列中有資料的時候,使用此方法會進入阻塞,直到放入佇列中所有的資料都被處理掉才轉換成不阻塞(每處理一個數據就使用一次taskdone) 解決剛才生產者消費者模型low的問題: import time import random from multiprocessing import Process,JoinableQueue def consumer(jq,name): # 消費者 while True: food = jq.get() # 在佇列中取值 # if food is None:break time.sleep(random.uniform(0.3,1)) # 模擬吃消耗的時間 print('%s偷吃了%s,快打死他' %(name,food)) jq.task_done() # 向jq.join()傳送一次訊號,證明這個資料已經處理了 def producter(jq,name,food): # 生產者 for i in range(10): time.sleep(random.uniform(0.5,0.9)) # 模擬生產時間 print('%s生產了%s,序號:%s' %(name,food,i)) jq.put(food+str(i)) # 把值存入佇列中 if __name__ == '__main__': jq = JoinableQueue() c1 = Process(target=consumer,args=(jq,'小明')) c2 = Process(target=consumer,args=(jq,'小東')) c1.daemon = True # 把消費者設定為守護程序 c2.daemon = True c1.start() c2.start() p1 = Process(target=producter,args=(jq,'張三','麵包')) p2 = Process(target=producter,args=(jq,'李四','可樂')) p1.start() p2.start() p1.join() p2.join() jq.join() # 資料全部被task_done後才不阻塞 三、管道 #建立管道的類: Pipe([duplex]):在程序之間建立一條管道,並返回元組(left,right),其中left,right表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道 #引數介紹: duplex:預設管道是全雙工的,如果將duplex改成False,left只能用於接收,right只能用於傳送。 #主要方法: right.recv():接收left.send()傳送的內容。如果沒有訊息可接收,recv方法會一直阻塞。如果連線的另外一端已經關閉,那麼recv方法會丟擲EOFError。 letf.send():通過連線傳送內容。 #其他方法: close():關閉連線。 fileno():返回連線使用的整數檔案描述符 poll([timeout]):如果連線上的資料可用,返回True。timeout指定等待的最長時限。如果省略此引數,方法將立即返回結果。如果將timeout設定成None,操作將無限期地等待資料到達。 recv_bytes([maxlength]):接收c.send_bytes()方法傳送的一條完整的位元組訊息。maxlength指定要接收的最大位元組數。如果進入的訊息,超過了這個最大值,將引發IOError異常,並且在連線上無法進行進一步讀取。如果連線的另外一端已經關閉,再也不存在任何資料,將引發EOFError異常。 send_bytes(buffer [, offset [, size]]):通過連線傳送位元組資料緩衝區,buffer是支援緩衝區介面的任意物件,offset是緩衝區中的位元組偏移量,而size是要傳送位元組數。結果資料以單條訊息的形式發出,然後呼叫c.recv_bytes()函式進行接收 recv_bytes_into(buffer [, offset]):接收一條完整的位元組訊息,並把它儲存在buffer物件中,該物件支援可寫入的緩衝區介面(即bytearray物件或類似的物件)。offset指定緩衝區中放置訊息處的位元組位移。返回值是收到的位元組數。如果訊息長度大於可用的緩衝區空間,將引發BufferTooShort異常。 注意: # 佇列是基於管道實現的,使用管道 + 鎖的模式形成的IPC方式,使得程序之間資料安全 # 管道是基於socket實現的,使用socket + pickle 的模式形成的IPC方式,使得程序之間資料是不安全的且存取資料複雜(同一時間不同程序對同一個資料進行處理,造成資料的不安全) 例子:(左邊傳送,右邊接收) from multiprocessing import Process,Pipe def test(right): print(right.recv()) right.close() if __name__ == '__main__': left,right = Pipe() p = Process(target=test,args=(right,)) p.start() left.send('hello') (右邊傳送,左邊接收) from multiprocessing import Process,Pipe def test(left): left.send('hi') left.close() if __name__ == '__main__': left,right = Pipe() p = Process(target=test,args=(left,)) p.start() print(right.recv()) # pipe的埠管理不會隨著某一個程序的關閉就關閉 # 作業系統來管理程序對這些埠的使用,不使用的埠應該關閉它 # 一條管道,兩個程序,就有4個埠 每關閉一個埠計數-1,直到只剩下一個埠的時候 recv就會報錯 # 如果不關閉不使用的埠,在已經把資料傳送完畢的情況下,那麼接收端的recv就會一直掛起,等待接收資料,這個程序就一直不能關閉 # 因此不使用的埠就應該關閉它,讓recv丟擲異常後對這個程序進行處理 from multiprocessing import Process,Pipe def consumer(left,right): left.close() # 若這裡不close,則不會異常EOFError,資料接收完畢後,下面的right.recv()就會一直掛起 while True: try: print(right.recv()) except EOFError: break if __name__ == '__main__': left,right = Pipe() Process(target=consumer,args=(left,right)).start() right.close() for i in range(10): left.send('Apple%s'%i) left.close() 四、程序池(不能傳佇列作為子程序的引數,只能傳管道) 1、為什麼要有程序池? 在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼? 首先,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。 2、程序池的概念 定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。 如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。 這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。 # 訊號量 # 500件衣服 有500個任務 # 500個人 建立了500個程序 # 只有4臺機器 但是隻有4個CPU # 輪流去做,一個人做完了就走,機器留給另一個人做 # 多程序 # 500件衣服 有500個任務 # 500個人 建立了500個程序 # 搶4臺機器 但是隻有4個CPU # 大家搶著機器去做,搶到的就做 # 程序池 # 500件衣服 有500個任務 # 4個人 建立了4個程序 # 4臺機器 有4個CUP # 4個人拿著機器做,做完一個繼續做下一個,直到500個做完 3、建立程序池 p = Pool([numprocess [,initializer [, initargs]]]) 引數介紹: numprocess:要建立的程序數,如果省略,將預設使用os.cpu_count()的值,即你電腦CPU的個數 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None initargs:是要傳給initializer的引數組 主要方法: 同步提交 p.apply(func [, args [, kwargs]]) 返回值 : 子程序對應函式的返回值(子程序return的返回值) 一個一個順序執行的,並沒有任何併發效果 非同步提交 p.apply_async(func [, args [, kwargs]]) 沒有返回值,要想所有任務能夠順利的執行完畢,需要執行: p.close() p.join() # 必須先close再join,阻塞直到程序池中的所有任務都執行完畢 有返回值(res接收返回值)的情況下 res.get() # get不能在提交任務之後立刻執行,應該是先提交所有的任務再通過get獲取結果 p.map() 非同步提交的簡化版本 自帶close和join方法 p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成 p.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫 其他方法(不常用) 方法apply_async()和map_async()的返回值是AsyncResul的例項obj。例項具有以下方法 obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。 obj.ready():如果呼叫完成,返回True obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常 obj.wait([timeout]):等待結果變為可用。 obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函式 4、例子 4-1、apply import os import time from multiprocessing import Pool def test(num): time.sleep(1) print('%s:%s' %(num,os.getpid())) return num*2 if __name__ == '__main__': p = Pool() for i in range(20): res = p.apply(test,args=(i,)) # 提交任務的方法 同步提交 print('-->',res) # res就是test的return的值 4-2、apply_async import time from multiprocessing import Pool def func(num): time.sleep(1) print('做了%s件衣服'%num) if __name__ == '__main__': p = Pool(4) # 程序池中建立4個程序,不寫的話,預設值為你電腦的CUP數量 for i in range(50): p.apply_async(func,args=(i,)) # 非同步提交func到一個子程序中執行,沒有返回值的情況 p.close() # 關閉程序池,使用者不能再向這個池中提交任務了 p.join() # 阻塞,直到程序池中所有的任務都被執行完 注意: 非同步提交且沒有返回值接收的情況下必須要用close()和join() 因為如果沒有close()和join(),主程序執行完畢後會立刻把子程序回收了,相當於子程序還沒來得及開啟 所以要join,讓子程序結束後再結束父程序,但是程序池中要使用join就必須先進行close import time import os from multiprocessing import Pool def test(num): time.sleep(1) print('%s:%s' %(num,os.getpid())) return num*2 if __name__ == '__main__': p = Pool() res_lst = [] for i in range(20): res = p.apply_async(test,args=(i,)) # 提交任務的方法 非同步提交 res_lst.append(res) for res in res_lst: print(res.get()) 注意: 非同步提交有返回值的情況下,res是一個物件代表的是這個任務的編號,需要用res.get()方法讓任務執行且把返回值返回給你。 get有阻塞效果,拿到子程序的返回值後才不阻塞,所以並不需要再使用close和join。 4-3、map import time import os from multiprocessing import Pool def test(num): time.sleep(1) print('%s:%s'%(num,os.getpid())) return num*2 if __name__ == '__main__': p = Pool() p.map(test,range(20)) 注意: map接收一個函式和一個可迭代物件,是非同步提交的簡化版本,自帶close和join方法
可迭代物件的每一個值就是函式接收的實參,可迭代物件的長度就是建立的任務數量
map可以直接拿到返回值的可迭代物件(列表),迴圈就可以獲取返回值
複製程式碼
import time
from multiprocessing import Pool

def func(num):
    print('子程序:',num)
    # time.sleep(1)
    return num

if __name__ == '__main__':
    p = Pool()
    ret = p.map(func,range(10))   # ret是列表
    for i in ret:
        print('返回值:',i)
複製程式碼
 
  

 

 
複製程式碼

 

複製程式碼
一、佇列(先進先出)
程序間通訊:IPC(Inter-Process Communication)

佇列是使用管道和鎖定實現,所以Queue是多程序安全的佇列,使用Queue可以實現多程序之間的資料傳遞。 


1、Queue([maxsize]) 
建立共享的程序佇列。maxsize是佇列中允許的最大項數。如果省略此引數,則無大小限制。

Queue的例項q具有以下方法:
q.get( [ block [ ,timeout ] ] ) 
返回q中的一個專案。如果q為空,此方法將阻塞,直到佇列中有專案可用為止。block用於控制阻塞行為,預設為True阻塞. 如果設定為False,丟擲queue.Empty異常(定義在queue模組中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有專案變為可用,將引發queue.Empty異常。

q.get_nowait( ) 
相當於q.get(False)方法,沒有取到值也不會阻塞,直接丟擲queue.Empty異常。

q.put(item [, block [,timeout ] ] ) 
將item放入佇列。如果佇列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,預設為True阻塞。如果設定為False,將引發queue.Full異常(定義在queue庫模組中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發queue.Full異常。

q.put_nowait()
相當於q.put(item,False)方法,佇列已滿,不會阻塞,直接丟擲queue.Full異常。

q.qsize() 
返回佇列中目前專案的正確數量。此函式的結果並不可靠,因為在返回結果和在稍後程式中使用結果之間,佇列中可能新增或刪除了專案。在某些系統上,此方法可能引發NotImplementedError異常。

q.empty() 
如果呼叫此方法時 q為空,返回True。如果其他程序或執行緒正在往佇列中新增專案,結果是不可靠的。也就是說,在返回和使用結果之間,佇列中可能已經加入新的專案。

q.full() 
如果q已滿,返回為True. 由於執行緒的存在,結果也可能是不可靠的(參考q.empty()方法)。


其他方法(不常用):
q.close() 
關閉佇列,防止佇列中加入更多資料。呼叫此方法時,後臺執行緒將繼續寫入那些已入佇列但尚未寫入的資料,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動呼叫此方法。關閉佇列不會在佇列使用者中生成任何型別的資料結束訊號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的佇列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再程序退出時自動連線後臺執行緒。這可以防止join_thread()方法阻塞。

q.join_thread() 
連線佇列的後臺執行緒。此方法用於在呼叫q.close()方法後,等待所有佇列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序呼叫。呼叫q.cancel_join_thread()方法可以禁止這種行為。



2、方法使用例子:
from multiprocessing import Queue
import queue

(1)
q = Queue(2)
q.put(1)
q.put(2)

print(q.get())   # 1
print(q.get())   # 2


(2)
q = Queue(2)
q.put(1)
q.put(2)
q.put(3)   # 一直阻塞在這裡

print(q.get()) 
print(q.get()) 


(3)
q = Queue(2)
q.put(1)
q.put(2)

print(q.get())   # 1
print(q.get())   # 2
print(q.get())   # 一直阻塞在這裡

(4)
q = Queue(2)
q.put(1)
q.put(2)
q.put_nowait(3)   # 異常:queue.Full
# q.put(3,False)  # 異常:queue.Full
# q.put(3,timeout=2)  # 2秒後異常:queue.Full


(5)
q = Queue(2)
q.put(1)
q.put(2)

print(q.get())  # 1
print(q.get())  # 2
print(q.get_nowait())   # 異常:queue.Empty
# print(q.get(False))   # 異常:queue.Empty
# print(q.get(timeout=2))   # 2秒後異常:queue.Empty


3、小例子
from multiprocessing import Process,Queue

def consume(q):
    print('子程序:',q.get())
    q.put('hi')
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=consume,args=(q,))
    p.start()
    q.put('hello')
    p.join()
    print('主程序:',q.get())



4、生產者消費者模型
場景:
為什麼要使用生產者和消費者模式
線上程世界裡,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒。在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,
那麼生產者就必須等待消費者處理完,才能繼續生產資料。同樣的道理,如果消費者的處理能力大於生產者,那麼消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麼是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,
所以生產者生產完資料之後不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,平衡了生產者和消費者的處理能力。


import time
import random
from multiprocessing import Process,Queue

def consumer(q,name):
    # 消費者
    while True:
        food = q.get()  # 在佇列中取值
        if food is None:break
        time.sleep(random.uniform(0.3,1))  # 模擬吃消耗的時間
        print('%s偷吃了%s,快打死他' %(name,food))

def producter(q,name,food):
    # 生產者
    for i in range(10):
        time.sleep(random.uniform(0.5,0.9))  # 模擬生產時間
        print('%s生產了%s,序號:%s' %(name,food,i))
        q.put(food+str(i))  # 把值存入佇列中

if __name__ == '__main__':
    q = Queue()  # Queue佇列物件
    c1 = Process(target=consumer,args=(q,'小明'))
    c2 = Process(target=consumer,args=(q,'小東'))
    c1.start()
    c2.start()

    p1 = Process(target=producter,args=(q,'張三','麵包'))
    p2 = Process(target=producter,args=(q,'李四','可樂'))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    q.put(None) # 有幾個consumer程序就需要放幾個None,表示生產完畢(這就有點low了)
    q.put(None)


    
二、JoinableQueue
JoinableQueue和Queue幾乎一樣,不同的是JoinableQueue佇列允許使用者告訴佇列某個資料已經處理了。通知程序是使用共享的訊號和條件變數來實現的。 
task_done():使用者使用此方法發出訊號,表示q.get()返回的專案已經被處理
join():當佇列中有資料的時候,使用此方法會進入阻塞,直到放入佇列中所有的資料都被處理掉才轉換成不阻塞(每處理一個數據就使用一次taskdone)

解決剛才生產者消費者模型low的問題:
import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(jq,name):
    # 消費者
    while True:
        food = jq.get()  # 在佇列中取值
        # if food is None:break
        time.sleep(random.uniform(0.3,1))  # 模擬吃消耗的時間
        print('%s偷吃了%s,快打死他' %(name,food))
        jq.task_done() # 向jq.join()傳送一次訊號,證明這個資料已經處理了

def producter(jq,name,food):
    # 生產者
    for i in range(10):
        time.sleep(random.uniform(0.5,0.9))  # 模擬生產時間
        print('%s生產了%s,序號:%s' %(name,food,i))
        jq.put(food+str(i))  # 把值存入佇列中


if __name__ == '__main__':
    jq = JoinableQueue()
    c1 = Process(target=consumer,args=(jq,'小明'))
    c2 = Process(target=consumer,args=(jq,'小東'))

    c1.daemon = True  # 把消費者設定為守護程序
    c2.daemon = True
    
    c1.start()
    c2.start()

    p1 = Process(target=producter,args=(jq,'張三','麵包'))
    p2 = Process(target=producter,args=(jq,'李四','可樂'))
    p1.start()
    p2.start()

    p1.join()
    p2.join()
    
    jq.join()   # 資料全部被task_done後才不阻塞
   





三、管道
#建立管道的類:
Pipe([duplex]):在程序之間建立一條管道,並返回元組(left,right),其中left,right表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道

#引數介紹:
duplex:預設管道是全雙工的,如果將duplex改成False,left只能用於接收,right只能用於傳送。


#主要方法:
    right.recv():接收left.send()傳送的內容。如果沒有訊息可接收,recv方法會一直阻塞。如果連線的另外一端已經關閉,那麼recv方法會丟擲EOFError。
    letf.send():通過連線傳送內容。
 
 
#其他方法:
close():關閉連線。
fileno():返回連線使用的整數檔案描述符
poll([timeout]):如果連線上的資料可用,返回True。timeout指定等待的最長時限。如果省略此引數,方法將立即返回結果。如果將timeout設定成None,操作將無限期地等待資料到達。
 
recv_bytes([maxlength]):接收c.send_bytes()方法傳送的一條完整的位元組訊息。maxlength指定要接收的最大位元組數。如果進入的訊息,超過了這個最大值,將引發IOError異常,並且在連線上無法進行進一步讀取。如果連線的另外一端已經關閉,再也不存在任何資料,將引發EOFError異常。
send_bytes(buffer [, offset [, size]]):通過連線傳送位元組資料緩衝區,buffer是支援緩衝區介面的任意物件,offset是緩衝區中的位元組偏移量,而size是要傳送位元組數。結果資料以單條訊息的形式發出,然後呼叫c.recv_bytes()函式進行接收    
 
recv_bytes_into(buffer [, offset]):接收一條完整的位元組訊息,並把它儲存在buffer物件中,該物件支援可寫入的緩衝區介面(即bytearray物件或類似的物件)。offset指定緩衝區中放置訊息處的位元組位移。返回值是收到的位元組數。如果訊息長度大於可用的緩衝區空間,將引發BufferTooShort異常。


注意:
# 佇列是基於管道實現的,使用管道 + 鎖的模式形成的IPC方式,使得程序之間資料安全
# 管道是基於socket實現的,使用socket + pickle 的模式形成的IPC方式,使得程序之間資料是不安全的且存取資料複雜(同一時間不同程序對同一個資料進行處理,造成資料的不安全)


例子:(左邊傳送,右邊接收)
from multiprocessing import Process,Pipe

def test(right):
    print(right.recv())
    right.close()

if __name__ == '__main__':
    left,right = Pipe()
    p = Process(target=test,args=(right,))
    p.start()
    left.send('hello')

    
(右邊傳送,左邊接收)
from multiprocessing import Process,Pipe

def test(left):
    left.send('hi')
    left.close()

if __name__ == '__main__':
    left,right = Pipe()
    p = Process(target=test,args=(left,))
    p.start()
    print(right.recv())
    

# pipe的埠管理不會隨著某一個程序的關閉就關閉
# 作業系統來管理程序對這些埠的使用,不使用的埠應該關閉它
# 一條管道,兩個程序,就有4個埠  每關閉一個埠計數-1,直到只剩下一個埠的時候 recv就會報錯
# 如果不關閉不使用的埠,在已經把資料傳送完畢的情況下,那麼接收端的recv就會一直掛起,等待接收資料,這個程序就一直不能關閉
# 因此不使用的埠就應該關閉它,讓recv丟擲異常後對這個程序進行處理

from multiprocessing import Process,Pipe
def consumer(left,right):
    left.close()  # 若這裡不close,則不會異常EOFError,資料接收完畢後,下面的right.recv()就會一直掛起
    while True:
        try:
            print(right.recv())
        except EOFError:
            break

if __name__ == '__main__':
    left,right = Pipe()
    Process(target=consumer,args=(left,right)).start()
    right.close()
    for i in range(10):
        left.send('Apple%s'%i)
    left.close()





四、程序池(不能傳佇列作為子程序的引數,只能傳管道)
1、為什麼要有程序池?
在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?
首先,建立程序需要消耗時間,銷燬程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。


2、程序池的概念
定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。
如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。
這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。


# 訊號量
    # 500件衣服          有500個任務
    # 500個人           建立了500個程序
    # 只有4臺機器        但是隻有4個CPU
    # 輪流去做,一個人做完了就走,機器留給另一個人做

# 多程序
    # 500件衣服          有500個任務
    # 500個人           建立了500個程序
    # 搶4臺機器          但是隻有4個CPU
    # 大家搶著機器去做,搶到的就做

# 程序池
    # 500件衣服          有500個任務
    # 4個人             建立了4個程序
    # 4臺機器           有4個CUP
    # 4個人拿著機器做,做完一個繼續做下一個,直到500個做完
    

    
3、建立程序池
p = Pool([numprocess  [,initializer [, initargs]]])

引數介紹:
numprocess:要建立的程序數,如果省略,將預設使用os.cpu_count()的值,即你電腦CPU的個數
initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None
initargs:是要傳給initializer的引數組


主要方法:
同步提交 p.apply(func [, args [, kwargs]])
    返回值 : 子程序對應函式的返回值(子程序return的返回值)
    一個一個順序執行的,並沒有任何併發效果

非同步提交 p.apply_async(func [, args [, kwargs]])
    沒有返回值,要想所有任務能夠順利的執行完畢,需要執行:
        p.close()
        p.join()  # 必須先close再join,阻塞直到程序池中的所有任務都執行完畢
    有返回值(res接收返回值)的情況下
        res.get()  # get不能在提交任務之後立刻執行,應該是先提交所有的任務再通過get獲取結果

p.map()
    非同步提交的簡化版本
    自帶close和join方法

p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成

p.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫



其他方法(不常用)
方法apply_async()和map_async()的返回值是AsyncResul的例項obj。例項具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠端操作中引發了異常,它將在呼叫此方法時再次被引發。
obj.ready():如果呼叫完成,返回True
obj.successful():如果呼叫完成且沒有引發異常,返回True,如果在結果就緒之前呼叫此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動呼叫此函式



4、例子
4-1、apply
import os
import time
from multiprocessing import Pool

def test(num):
    time.sleep(1)
    print('%s:%s' %(num,os.getpid()))
    return num*2

if __name__ == '__main__':
    p = Pool()
    for i in range(20):
        res = p.apply(test,args=(i,)) # 提交任務的方法 同步提交
        print('-->',res)  # res就是test的return的值



        
4-2、apply_async
import time
from multiprocessing import Pool

def func(num):
    time.sleep(1)
    print('做了%s件衣服'%num)

if __name__ == '__main__':
    p = Pool(4)  # 程序池中建立4個程序,不寫的話,預設值為你電腦的CUP數量
    for i in range(50):
        p.apply_async(func,args=(i,)) # 非同步提交func到一個子程序中執行,沒有返回值的情況
    p.close() # 關閉程序池,使用者不能再向這個池中提交任務了
    p.join()  # 阻塞,直到程序池中所有的任務都被執行完

注意:
非同步提交且沒有返回值接收的情況下必須要用close()和join()
因為如果沒有close()和join(),主程序執行完畢後會立刻把子程序回收了,相當於子程序還沒來得及開啟
所以要join,讓子程序結束後再結束父程序,但是程序池中要使用join就必須先進行close
    

import time
import os
from multiprocessing import Pool

def test(num):
    time.sleep(1)
    print('%s:%s' %(num,os.getpid()))
    return num*2

if __name__ == '__main__':
    p = Pool()
    res_lst = []
    for i in range(20):
        res = p.apply_async(test,args=(i,))   # 提交任務的方法 非同步提交
        res_lst.append(res)
    for res in res_lst:
        print(res.get())

注意:
非同步提交有返回值的情況下,res是一個物件代表的是這個任務的編號,需要用res.get()方法讓任務執行且把返回值返回給你。
get有阻塞效果,拿到子程序的返回值後才不阻塞,所以並不需要再使用close和join。



4-3、map
import time
import os
from multiprocessing import Pool

def test(num):
    time.sleep(1)
    print('%s:%s'%(num,os.getpid()))
    return num*2

if __name__ == '__main__':
    p = Pool()
    p.map(test,range(20))

注意:
map接收一個函式和一個可迭代物件,是非同步提交的簡化版本,自帶close和join方法
可迭代物件的每一個值就是函式接收的實參,可迭代物件的長度就是建立的任務數量
map可以直接拿到返回值的可迭代物件(列表),迴圈就可以獲取返回值
複製程式碼
import time
from multiprocessing import Pool

def func(num):
    print('子程序:',num)
    # time.sleep(1)
    return num

if __name__ == '__main__':
    p = Pool()
    ret = p.map(func,range(10))   # ret是列表
    for i in ret:
        print('返回值:',i)
複製程式碼
 
 

 

 
複製程式碼