Python 併發程式設計(管道,事件,訊號量,程序池)
管道
Conn1,conn2 = Pipe()
Conn1.recv()
Conn1.send()
資料接收一次就沒有了
from multiprocessing import Process,Pipe def f1(conn): from_zhujincheng = conn.recv() print('子程序') print('來自主程序的訊息:',from_zhujincheng) if __name__ == '__main__': conn1,conn2 = Pipe() #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的p1 = Process(target=f1,args=(conn2,)) p1.start() conn1.send('出來吧') print('主程序')
事件
E = Event() #初識狀態是false
E.wait() 當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行
E.set() 將事件物件的狀態改為true,
E.is_set() 檢視狀態
E.clear() 將事件物件的狀態改為false
frommultiprocessing import Process,Event e = Event() #建立事件物件,這個物件的初識狀態為False print('e的狀態是:',e.is_set()) # False print('程序執行到這裡了') e.set() #將e的狀態改為True print('e的狀態是:',e.is_set()) # True e.clear() #將e的狀態改為False e.wait() #e這個事件物件如果值為False,就在我加wait的地方等待 print('程序過了wait')
訊號量
S = semphore(數字),內部維護了一個計數器,acquire-1,release+1,為0的時候,其他的程序都要在acquire之前等待
S.acquire()
需要鎖住的程式碼
S.release()
import time,random from multiprocessing import Process,Semaphore def f1(i,s): s.acquire() print('%s男嘉賓到了'%i) time.sleep(random.randint(1,3)) s.release() if __name__ == '__main__': s = Semaphore(4) #計數器4,acquire一次減一,為0 ,其他人等待,release加1 for i in range(10): p = Process(target=f1,args=(i,s)) p.start()
程序池
程序的建立和銷燬是很有消耗的,影響程式碼執行效率
在有程序池的程式碼中,主程序執行結束,程序池裡面的任務全部停止,不會等待程序池裡面的任務
pl = Pool(數字) 這個數字一般是電腦的cpu數
pl的方法:
Map:非同步提交任務,並且傳參需要可迭代型別的資料,自帶close和join功能
import time from multiprocessing import Process,Pool #對比多程序和程序池的效率 def f1(n): for i in range(5): n = n + i if __name__ == '__main__': #統計程序池執行100個任務的時間 s_time = time.time() pool = Pool(4) pool.map(f1,range(100)) e_time = time.time() dif_time = e_time - s_time #統計100個程序,來執行100個任務的執行時間 p_s_t = time.time() #多程序起始時間 p_list = [] for i in range(100): p = Process(target=f1,args=(i,)) p.start() p_list.append(p) [pp.join() for pp in p_list] p_e_t = time.time() p_dif_t = p_e_t - p_s_t print('程序池的時間:',dif_time) print('多程序的執行時間:',p_dif_t) # 結果: 程序池的時間: 0.40102291107177734 多程序的執行時間: 9.247529029846191 # 可以看出程序池執行效率遠遠大於建立多程序
Close : 鎖住程序池,防止有其他的新的任務在提交給程序池
Join : 等待著程序池將自己裡面的任務都執行完
Res = Apply(f1,args=(i,)) #同步執行任務,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res
import time from multiprocessing import Process,Pool def f1(n): time.sleep(1) return n*n if __name__ == '__main__': pool = Pool(4) for i in range(10): res = pool.apply(f1,args=(i,)) print(res)
Res_obj = Apply_async(f1,args=(i,)) #非同步提交任務,可以直接拿到結果物件,從結果物件裡面拿結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待
import time from multiprocessing import Process,Pool def f1(n): time.sleep(0.5) return n*n if __name__ == '__main__': pool = Pool(4) res_list = [] for i in range(10): res = pool.apply_async(f1,args=(i,)) # 不能直接列印返回值,因為直接返回結果物件,程序還沒執行完,結果物件裡沒有資料 res_list.append(res) pool.close() pool.join() #列印結果,非同步提交之後的結果物件 for i in res_list: print(i.get())
回撥函式:
Apply_async(f1,args=(i,),callback=function) #將前面f1這個任務的返回結果作為引數傳給callback指定的那個function函式
import os from multiprocessing import Pool,Process def f1(n): print('傳入的函式',n) return n*n def call_back_func(asdf): print('回撥函式',asdf) if __name__ == '__main__': pool = Pool(4) res = pool.apply_async(f1,args=(5,),callback=call_back_func) pool.close() pool.join()