python 管道 事件 訊號量 程序池(map/同步/非同步)回撥函式
阿新 • • 發佈:2019-01-11
####################總結########################
管道:是程序間通訊的第二種方式,但是不推薦使用,因為管道會導致資料不安全的情況出現
事件:當我執行主程序的時候 需要子執行某個程序後 需要的返回值時 可以使用
訊號量:互斥鎖同時只允許一個執行緒更改資料,而訊號量Semaphore是同時允許一定數量的執行緒更改資料 。
內部維護了一個計數器,acquire-1,release+1,為0的時候,其他的程序都要在acquire之前等待
程序池:
程序的建立和銷燬是很有消耗的,影響程式碼執行效率
程序池:定義一個池子,池中程序的數量是固定的,那麼同一時間有固定數量的程序在執行。
這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果
管道
###管道 from multiprocessing import Process,Pipe def f1(conn): zhu=conn.recv() print('我是子程序') print('來自主程序訊息',zhu) if __name__ == '__main__': conn1,conn2=Pipe() p1=Process(target=f1,args=(conn2,)) p1.start() conn1.send('我是conn1') print('我是主程序') ####################### 我是主程序 我是子程序 來自主程序訊息 我是conn1
事件
from multiprocessing import Process,Event e=Event() print('e的狀態是',e.is_set()) print('程序執行到這裡了') e.set()#將e的狀態改為True print('e的狀態是',e.is_set()) e.clear()#將e的狀態改為False e.wait()#e這個事件物件如果值為False,就在我加wait的地方等待 print('程序過了wait') ################## e的狀態是 False 程序執行到這裡了 e的狀態是 True
訊號量
import time import random from multiprocessing import Process,Semaphore def f1(i,s): s.acquire() #此時進來的人獲取房間 print('%s男嘉賓到了'%i) time.sleep(random.randint(1,3)) s.release() #出房間,此時下個人可以進入 if __name__ == '__main__': s = Semaphore(4) #計數器4,acquire一次減一,為0 ,其他人等待,release加1, for i in range(10): p = Process(target=f1,args=(i,s)) p.start() ############### 6男嘉賓到了 4男嘉賓到了 7男嘉賓到了 3男嘉賓到了 #前面的人出去了,出去一個就往後上加 2男嘉賓到了 1男嘉賓到了 0男嘉賓到了 5男嘉賓到了 9男嘉賓到了 8男嘉賓到了
程序池 map
import time from multiprocessing import Pool,Process #計算下開多程序和程序池的執行效率 def func(n): for i in range(5): n = n + i # print(n) if __name__ == '__main__': #程序池模式時間統計 pool_s_time = time.time() pool = Pool(4) #建立含有4個程序的程序池 ****一般約定俗成的是程序池中的程序數量為CPU的數量,工作中要看具體情況來考量 pool.map(func,range(100)) #引數資料必須是可迭代的,非同步提交任務,自帶join功能 #注意,map目前只限於接收一個可迭代的資料型別引數, #多程序模式 pool_end_time = time.time() pool_dif_time = pool_end_time - pool_s_time #############程序時間統計##################################### # 統計100個程序,來執行100個任務的執行時間 p_s_time = time.time() p_lst= [] #因為這個是非同步的,我子程序執行他自己的,如果直接在for迴圈join會每個程序等待 #所以要放到程序直接 記憶體中 然後在join for i in range(100): p = Process(target= func,args = (i,)) p.start() p_lst.append(p) [p.join() for p in p_lst] p_e_time = time.time() p_dif_time = p_e_time - p_s_time print("程序池的執行時間",pool_dif_time) #0.176239013671875 程序池 的效率高 print("建立程序的執行時間",p_dif_time) #7.825609922409058
程序池同步 把程序池 變序列執行的方式
import time from multiprocessing import Process,Pool def f1(n): time.sleep(1) # print(n) return n*n if __name__ == '__main__': pool = Pool(4) for i in range(10): print('xxxxxx') res = pool.apply(f1,args=(i,)) print(res) ############ xxxxxx 0 xxxxxx 1 xxxxxx 4 xxxxxx 9 xxxxxx 16
程序池 非同步執行
import time from multiprocessing import Pool def f1(i): time.sleep(1) # print(i) return i if __name__ == '__main__': p=Pool(4) res_lst=[]#迴圈 for i in range(10): # print('xxxx') res=p.apply_async(f1,args=(i,)) # print(res)#10個空結果物件 res_lst.append(res)#把結果值列印大記憶體中 #主程序執行結果,程序池裡面的任務全部停止,不會等待程序池裡面的任務 # p.close()#鎖住程序池,意思就是不讓其他的程式再往這個程序池裡面提交任務了 # p.join()#它的作用是等到程序池的任務全部執行完,然後結束主程序 #預設會 4個一列印 有了這2個引數後就會 一次性獲取出來 for r in res_lst: print(r.get()) print('等待所有任務執行完')
#########################################
D:\python_work_s18\venv\Scripts\python.exe D:/python_work_s18/day31/1111.py
0
1
2
3
--
4
5
6
7
---
8
9
等待所有任務執行完
回撥函式:
Apply_async(f1,args=(i,),callback=function) #將前面f1這個任務的返回結果作為引數傳給callback指定的那個function函式
import os from multiprocessing import Pool,Process def f1(n): print('程序池裡面的程序id',os.getpid()) print('>>>>',n) return n*n def call_back_func(asdf): print('>>>>>>>',os.getpid()) print('回撥函式中的結果:', asdf) if __name__ == '__main__': pool=Pool(4) res = pool.apply_async(f1,args=(5,),callback=call_back_func) pool.close() pool.join() print('主程序id',os.getpid()) ################################ 程序池裡面的程序id 7872 >>>> 5 >>>>>>> 8640 回撥函式中的結果: 25 主程序id 8640