第三十二天- 管道 程序池
1.管道
程序間通訊(IPC)方式二:管道(不推薦使用,瞭解即可),埠易導致資料不安全的情況出現。
1 from multiprocessing import Pipe,Process 2 3 4 def func(conn1,conn2): 5 msg = conn1.recv() # 接收了conn2傳遞的 6 # msg1 = conn2.recv() # 接收了conn1傳遞的 7 print('>>>',msg) 8 # print('>>>',msg1) 9 1011 if __name__ == '__main__': 12 # 拿到管道的兩端,雙工通訊方式,兩端都可以收發訊息 13 conn1,conn2 = Pipe() # 必須在Process之前產生管道 14 p = Process(target=func,args=(conn1,conn2,)) # 管道給子程序 15 p.start() 16 conn1.send('hello') 17 conn1.close() 18 conn2.send('小子') 19 conn2.close() 20 21 print('程序結束') 22 23 # 注意管道不用了就關閉防止異常
2.共享資料
程序之間資料共享的模組之一Manager模組(少用):
程序間資料是獨立的,可以藉助於佇列或管道實現通訊,二者都是基於訊息傳遞的雖然程序間資料獨立,但可以通過Manager實現資料共享:
1 from multiprocessing import Manager,Process,Lock 2 3 4 def func1(dic,loc): 5 # loc.acquire() # 不加鎖易出錯 6 dic['num基於Manager的資料共享'] -= 1 7 # loc.release() 8 9 10 if __name__ == '__main__': 11 m = Manager() 12 loc = Lock() 13 dic = m.dict({'num':100}) 14 p_list = [] 15 for i in range(100): 16 p = Process(target=func1, args=(dic,loc)) 17 p_list.append(p) 18 p.start() 19 20 [pp.join() for pp in p_list] 21 22 print('>>>>>',dic['num']) 23 # 共享時不加鎖,很可能導致同一個資料被多個子程序取用,資料是不安全的,且超多程序消耗大量資源易導致卡死.
多程序共同去處理共享資料的時候,就和我們多程序同時去操作一個檔案中的資料是一樣的,不加鎖就會出現錯誤的結果,程序不安全的,所以也需要加鎖。
總結:程序間應該儘量避免通訊,即便需要通訊,也應該選擇程序安全的工具來避免加鎖帶來的問題。
3.程序池 Pool
建立程序需要消耗時間,銷燬程序(空間,變數,檔案資訊等等的內容)也需要消耗時間。開啟成千上萬的程序,作業系統無法讓他們同時執行,維護一個很大的程序列表的同時,排程的時候,還需要進行切換並且記錄每個程序的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是作業系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序,這就需要用到程序池:
定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果。
建立方法:
Pool([numprocess [,initializer [, initargs]]]):建立程序池
引數介紹:
1 numprocess:要建立的程序數,如果省略,將預設使用cpu_count()的值 2 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為None 3 initargs:是要傳給initializer的引數組
常用方法:
p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。 '''需要強調的是:此操作並不會在所有池工作程序中並執行func函式。如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p.apply()函式或者使用p.apply_async()''' p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後返回結果。 '''此方法的結果是AsyncResult類的例項,callback是可呼叫物件,接收輸入引數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他非同步操作中的結果。''' p.close():關閉程序池,防止進一步操作。如果所有操作持續掛起,它們將在工作程序終止前完成 P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫 主要方法介紹
1 import time 2 from multiprocessing import Process,Pool 3 4 5 def func1(i): 6 num = 0 7 for j in range(5): 8 num += i 9 10 11 if __name__ == '__main__': 12 pool = Pool(6) # 建立程序池 13 14 p_list = [] 15 start_time = time.time() 16 for i in range(500): 17 p = Process(target=func1,args=(i,)) 18 p_list.append(p) 19 p.start() 20 21 [pp.join() for pp in p_list] 22 end_time = time.time() 23 print('耗時:',end_time-start_time) 24 25 s_time = time.time() 26 pool.map(func1,range(500)) # map 27 e_time = time.time() 28 print('耗時:',e_time - s_time) # 耗時遠遠小於直接開500程序程序池 簡單應用
apply同步方法:
1 from multiprocessing import Process,Pool 2 import time 3 4 5 def func1(i): 6 num = 0 7 for j in range(3): 8 num += i 9 time.sleep(1) 10 print(num) 11 return num 12 13 14 if __name__ == '__main__': 15 pool = Pool(6) 16 17 for i in range(10): 18 res = pool.apply(func1,args=(i,)) # apply 程序同步/序列方法 效率低,不常用 19 # print(res)apply 程序同步/序列方法
apply_async非同步方法:
1 from multiprocessing import Process,Pool 2 import time 3 4 5 def func1(i): 6 num = 0 7 for j in range(5): 8 num += i 9 time.sleep(1) 10 # print('>>>>>',num) 11 return num 12 13 14 if __name__ == '__main__': 15 pool = Pool(6) 16 17 red_list = [] 18 for i in range(10): 19 res = pool.apply_async(func1,args=(i,)) 20 red_list.append(res) 21 22 pool.close() # 不是關閉,只是鎖定程序池,告訴主程序不會再新增資料進去 23 pool.join() # 等待子程式執行完 24 25 for ress in red_list: 26 print(ress.get()) # get方法取出返回值num 按新增順序取出已儲存在快取區的結果 所以是順序打印出的View Code
回撥函式:運用時注意一點,回撥函式的形參執行有一個,如果你的執行函式有多個返回值,那麼也可以被回撥函式的這一個形參接收,接收的是一個元祖,包含著你執行函式的所有返回值。
1 from multiprocessing import Pool,Process 2 import time,os 3 4 5 def func1(n): 6 # print('子程序的pid:',os.getpid()) 7 return n*n 8 9 10 def func2(i): 11 res = i**2 12 # print('callback的pid:',os.getpid()) 13 print(res) 14 return res 15 16 17 if __name__ == '__main__': 18 pool = Pool(4) 19 pool.apply_async(func1,args=(3,),callback=func2) # callback把前面的返回值作引數傳給後面 20 # print('主程序的pid:',os.getpid()) # 主程序執行了callback 21 pool.close() 22 pool.join()回撥函式 callback
4.總結
程序之間的通訊:佇列、管道、資料共享也算
訊號量和事件也相當於鎖,也是全域性的,所有程序都能拿到這些鎖的狀態,程序之間這些鎖啊訊號量啊事件啊等等的通訊,其實底層還是socekt,只不過是基於檔案的socket通訊,而不是跟上面的資料共享啊空間共享啊之類的機制,我們之前學的是基於網路的socket通訊,socket的兩個家族,一個檔案的一個網路的,所以如果說這些鎖之類的報錯,可能你看到的就是類似於socket的錯誤。工作中常用的是鎖,訊號量和事件不常用,但是訊號量和事件面試的時候會問到(做了解)