1. 程式人生 > >45_並發編程-進程池

45_並發編程-進程池

print mil call 沒有 def 感知 空閑 lose none

一、為什麽引入進程池 在程序實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能只有零星任務。那麽在成千上萬個任務需要被執行的時候,我們就需要去創建成千上萬個進程麽?首先,創建進程需要消耗時間,銷毀進程(空間,變量,文件信息等等的內容)也需要消耗時間。第二即便開啟了成千上萬的進程,操作系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還需要進行切換並且記錄每個進程的執行節點,也就是記錄上下文(各種變量等等亂七八糟的東西,雖然你看不到,但是操作系統都要做),這樣反而會影響程序的效率。因此我們不能無限制的根據任務開啟或者結束進程。就看我們上面的一些代碼例子,你會發現有些程序是不是執行的時候比較慢才出結果,就是這個原因,那麽我們要怎麽做呢?
在這裏,要給大家介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等到處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。如果有很多任務需要執行,池中的進程數量不夠,任務就要等待之前的進程執行任務完畢歸來,拿到空閑進程才能繼續執行。也就是說,池中進程的數量是固定的,那麽同一時間最多有固定數量的進程在運行。這樣不會增加操作系統的調度難度,還節省了開閉進程的時間,也一定程度上能夠實現並發效果。 二、multiprocessing 引入Pool 模塊
1 1、語法格式:Pool([numprocess [,initializer [, initargs]]]):創建進程池
2 numprocess:要創建的進程數,如果省略,將默認使用cpu_count()的值 3 initializer:是每個工作進程啟動時要執行的可調用對象,默認為None 4 initargs:是要傳給initializer的參數組
創建進程池的類:如果指定numprocess為3,則進程池會從無到有創建三個進程,然後自始至終使用這三個進程去執行所有任務(高級一些的進程池可以根據你的並發量,搞成動態增加或減少進程池中的進程數量的操作),不會開啟其他進程,提高操作系統效率,減少空間的占用等。numprocess不行會選擇cpu分配,1 -> processes = os.cpu_count() or 1
2、主要方法
 1 <1> map()  - 並且自帶close和join,一般約定俗成的是進程池中的進程數量為CPU的數量,工作中要看具體情況來考量。
 2 
 3 # 進程池與進程的時間比較
 4 
 5 import time
 6 from multiprocessing import Process, Pool
 7 
 8 def func(n):
 9     print(n)
10 
11 if __name__ == __main__:
12 
13     # 進程池運行時間
14     time_pool_start = time.time()
15     pool =Pool(4)   # 進程池中只放4個進程
16     pool.map(func, range(100))      # 使用map方法實現,map實現的是異步的。map參數可叠代 range(100),把可叠代的元素放入func中執行
17     time_pool_end = time.time()
18     p_dif = time_pool_end - time_pool_start
19 
20     # 進程運行時間
21     time_pro_start = time.time()
22     lst = []
23     for i in range(100):
24         p = Process(target=func, args=(i,))
25         p.start()
26         lst.append(p)
27     [obj.join() for obj in lst]
28     time_pro_end = time.time()
29     p_dif_pro = time_pro_end - time_pro_start
30 
31     print("進程池>>",p_dif)
32     print("進程>>", p_dif_pro)
 1 <2> apply()       -        apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。同步執行,一個一個出,它會等待任務的執行結果
 2                    需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用
 3 
 4 import time
 5 from multiprocessing import Process,Pool
 6 
 7 def fun(i):
 8     time.sleep(0.5)
 9     # print(i)
10     return i**2
11 
12 if __name__ == __main__:
13     p = Pool(4)
14     for i in range(10):
15         res = p.apply(fun,args=(i,))  #同步執行的方法,他會等待你的任務的返回結果,
16         print(res)
 1 <3> apply_async()    -   apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果; 異步的
 2                         此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。
 3 
 4 import time
 5 from multiprocessing import Pool
 6 
 7 def func(n):
 8     time.sleep(0.5)
 9     return n**2
10 
11 if __name__ == __main__:
12 
13     p = Pool(4)
14     lst = []    # 設置一個空列表
15     for i in range(10):
16         ret = p.apply_async(func, args=(i,))  # 異步步執行,獲取返回值
17         lst.append(ret)     # 異步的,返回值放一個列表
18     time.sleep(1)
19     for el in lst:  # 循環列表
20         print(el.get()) # 獲取異步結果,因為get()一次只能取一個值,使用進程池取數據,每4個一組一打
21 
22 ----------------------------------------------------------------------------------------
23 import time
24 from multiprocessing import Pool
25 
26 def func(n):
27     time.sleep(0.5)
28     print(n)
29     return n**2
30 
31 if __name__ == __main__:
32 
33     p = Pool(4)
34     lst = []    # 設置一個空列表
35     for i in range(10):
36         ret = p.apply_async(func, args=(i,))  # 異步步執行,獲取返回值,4個一組
37         lst.append(ret)     # 異步的每4個放一個列表
38 
39     p.close() # 不是關閉進程池,而是不允許再有其他任務來使用進程池
40     p.join()    # 這是感知進程池中任務的方法,等待進程池的任務全部執行完
41 
42     for el in lst:  # 循環列表,這樣把進程池關上,不允許別的任務用,就會全部打印出來
43         print(el.get()) # 獲取異步結果,因為get()一次只能取一個值
44     print(主進程)
3、回調函數 - callback 需要回調函數的場景:進程池中任何一個任務一旦處理完了,就立即告知主進程:我好了額,你可以處理我的結果了。主進程則調用一個函數去處理該結果,該函數即回調函數,這是進程池特有的,普通進程沒有這個機制,但是我們也可以通過進程通信來拿到返回值,進程池的這個回調也是進程通信的機制完成的。 我們可以把耗時間(阻塞)的任務放到進程池中,然後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果
 1 import os
 2 from multiprocessing import Pool
 3 
 4 def func1(n):
 5     print(func1>>,os.getpid())
 6     # print(‘func1‘)
 7     return n*n
 8 
 9 
10 def func2(nn):
11     print(func2>>,os.getpid())
12     # print(‘func2‘)
13     print(nn)
14     # import time
15     # time.sleep(0.5)
16 if __name__ == __main__:
17     print(主進程:,os.getpid())
18     p = Pool(4)
19     p.apply_async(func1,args=(10,),callback=func2)    # func1->子進程運行; func2->主進程運行,把func1返回的值給func2去執行其他操作
20     p.close()
21     p.join()

45_並發編程-進程池