45_並發編程-進程池
阿新 • • 發佈:2018-10-25
print mil call 沒有 def 感知 空閑 lose none 一、為什麽引入進程池
在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那麽在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程麽?首先,創建進程需要消耗時間,銷毀進程(空間,變量,文件信息等等的內容)也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還需要進行切換並且記錄每個進程的執行節點,也就是記錄上下文(各種變量等等亂七八糟的東西,雖然你看不到,但是操作系統都要做),這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。就看我們上面的一些代碼例子,你會發現有些程序是不是執行的時候比較慢才出結果,就是這個原因,那麽我們要怎麽做呢?
在這裏,要給大家介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那麽同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。
二、multiprocessing 引入Pool 模塊
2、主要方法
1 1、語法格式:Pool([numprocess [,initializer [, initargs]]]):創建進程池創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然後自始至終使用這三個進程去執行所有任務(高級一些的進程池可以根據你的並發量,搞成動態增加或減少進程池中的進程數量的操作),不會開啟其他進程,提高操作系統效率,減少空間的占用等。numprocess不行會選擇cpu分配,1 -> processes = os.cpu_count() or 12 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 3 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 4 initargs:是要傳給initializer的參數組
1 <1> map() - 並且自帶close和join,一般約定俗成的是進程池中的進程數量為CPU的數量,工作中要看具體情況來考量。 2 3 # 進程池與進程的時間比較 4 5 import time 6 from multiprocessing import Process, Pool 7 8 def func(n): 9 print(n) 10 11 if __name__ == ‘__main__‘: 12 13 # 進程池運行時間 14 time_pool_start = time.time() 15 pool =Pool(4) # 進程池中只放4個進程 16 pool.map(func, range(100)) # 使用map方法實現,map實現的是異步的。map參數可叠代 range(100),把可叠代的元素放入func中執行 17 time_pool_end = time.time() 18 p_dif = time_pool_end - time_pool_start 19 20 # 進程運行時間 21 time_pro_start = time.time() 22 lst = [] 23 for i in range(100): 24 p = Process(target=func, args=(i,)) 25 p.start() 26 lst.append(p) 27 [obj.join() for obj in lst] 28 time_pro_end = time.time() 29 p_dif_pro = time_pro_end - time_pro_start 30 31 print("進程池>>",p_dif) 32 print("進程>>", p_dif_pro)
1 <2> apply() - apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。同步執行,一個一個出,它會等待任務的執行結果 2 需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用 3 4 import time 5 from multiprocessing import Process,Pool 6 7 def fun(i): 8 time.sleep(0.5) 9 # print(i) 10 return i**2 11 12 if __name__ == ‘__main__‘: 13 p = Pool(4) 14 for i in range(10): 15 res = p.apply(fun,args=(i,)) #同步執行的方法,他會等待你的任務的返回結果, 16 print(res)
1 <3> apply_async() - apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果; 異步的 2 此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。 3 4 import time 5 from multiprocessing import Pool 6 7 def func(n): 8 time.sleep(0.5) 9 return n**2 10 11 if __name__ == ‘__main__‘: 12 13 p = Pool(4) 14 lst = [] # 設置一個空列表 15 for i in range(10): 16 ret = p.apply_async(func, args=(i,)) # 異步步執行,獲取返回值 17 lst.append(ret) # 異步的,返回值放一個列表 18 time.sleep(1) 19 for el in lst: # 循環列表 20 print(el.get()) # 獲取異步結果,因為get()一次只能取一個值,使用進程池取數據,每4個一組一打 21 22 ---------------------------------------------------------------------------------------- 23 import time 24 from multiprocessing import Pool 25 26 def func(n): 27 time.sleep(0.5) 28 print(n) 29 return n**2 30 31 if __name__ == ‘__main__‘: 32 33 p = Pool(4) 34 lst = [] # 設置一個空列表 35 for i in range(10): 36 ret = p.apply_async(func, args=(i,)) # 異步步執行,獲取返回值,4個一組 37 lst.append(ret) # 異步的每4個放一個列表 38 39 p.close() # 不是關閉進程池,而是不允許再有其他任務來使用進程池 40 p.join() # 這是感知進程池中任務的方法,等待進程池的任務全部執行完 41 42 for el in lst: # 循環列表,這樣把進程池關上,不允許別的任務用,就會全部打印出來 43 print(el.get()) # 獲取異步結果,因為get()一次只能取一個值 44 print(‘主進程‘)3、回調函數 - callback 需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,但是我們也可以通過進程通信來拿到返回值,進程池的這個回調也是進程通信的機制完成的。 我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果
1 import os 2 from multiprocessing import Pool 3 4 def func1(n): 5 print(‘func1>>‘,os.getpid()) 6 # print(‘func1‘) 7 return n*n 8 9 10 def func2(nn): 11 print(‘func2>>‘,os.getpid()) 12 # print(‘func2‘) 13 print(nn) 14 # import time 15 # time.sleep(0.5) 16 if __name__ == ‘__main__‘: 17 print(‘主進程:‘,os.getpid()) 18 p = Pool(4) 19 p.apply_async(func1,args=(10,),callback=func2) # func1->子進程運行; func2->主進程運行,把func1返回的值給func2去執行其他操作 20 p.close() 21 p.join()
45_並發編程-進程池