1. 程式人生 > >程序與執行緒(2)- python實現多程序

程序與執行緒(2)- python實現多程序

python 實現多程序


 參考連結: https://morvanzhou.github.io/tutorials/python-basic/multiprocessing/


python中實現多程序的模組:multiprocessing

注意:在windows系統下,要想啟動一個子程序,必須把程序相關的內容寫在”if __name__ == “__main__” ”,這句話下面。


具體實現模組

1、Process模組

  • 實現功能:

建立子程序

  • 構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])


group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 程序名;
args/kwargs: 要傳入方法的引數。

  • 例項方法:

is_alive():返回程序是否在執行。
join([timeout]):阻塞當前上下文環境的程序程,直到呼叫此方法的程序終止或到達指定的timeout(可選引數)。
start():程序準備就緒,等待CPU排程。
run():strat()呼叫run方法,如果例項程序時未制定傳入target,這star執行t預設run()方法。
terminate():不管任務是否完成,立即停止工作程序。

  • 屬性:

authkey
daemon:和執行緒的setDeamon功能一樣(將父程序設定為守護程序,當父程序結束時,子程序也結束)。
exitcode(程序在執行時為None、如果為–N,表示被訊號N結束)。
name:程序名字。
pid:程序號。

  • 例子:

1 import multiprocessing
2 
3 def job(a,d):
4   print('aaaaa')
5 
6 if __name__ == “__main__”:
7   p1 = multiprocessing.Process(target=job,args=(1,2))
8   p1.start()
9   p1.join()

 

2、Pool模組

  • 實現功能:

建立管理程序池。提供指定數量的程序供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來執行它。在共享資源時,只能使用Multiprocessing.Manager類,而不能使用Queue或者Array。

  • 構造方法:

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes :使用的工作程序的數量,如果processes是None那麼使用 os.cpu_count()返回的數量。(Pool預設大小是CPU的核數,我們也可以通過在Pool中傳入processes引數即可自定義需要的核數量)
initializer: 如果initializer是None,那麼每一個工作程序在開始的時候會呼叫initializer(*initargs)。
maxtasksperchild:工作程序退出之前可以完成的任務數,完成後用一個新的工作程序來替代原程序,來讓閒置的資源被釋放。maxtasksperchild預設是None,意味著只要Pool存在工作程序就會一直存活。
context: 用在制定工作程序啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context物件的Pool()方法來建立一個池,兩種方法都適當的設定了context。

  • 例項方法:

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞。
apply(func[, args[, kwds]])是阻塞的。
close() 關閉pool,使其不在接受新的任務。
terminate() 關閉pool,結束工作程序,不在處理未完成的任務。
join() 主程序阻塞,等待子程序的退出, join方法要在close或terminate之後使用。

  • Pool使用方法:

1、Pool+map函式

  • 用map()獲取結果,在map()中需要放入函式和需要迭代運算的值,然後它會自動分配給CPU核,返回結果。
  • 說明:此寫法缺點在於只能通過map向函式傳遞一個引數。
 1 from multiprocessing import Pool
 2 
 3 def test(i):
 4     print i
 5 
 6 if __name__=="__main__":
 7     lists=[1,2,3]
 8     pool=Pool(processes=2)   #定義最大的程序數
 9     pool.map(test,lists)   #lists必須是一個可迭代變數。
10     pool.close()
11     pool.join()

2、非同步程序池(非阻塞)

  • apply_async()只能放入一組引數,並返回一個結果,如果想得到map()的效果需要通過迭代。
 1 from multiprocessing import Pool
 2 
 3 def test(i):
 4     print i
 5 
 6 if __name__=="__main__":
 7     pool = Pool(processes=10)
 8     for i in xrange(500):
 9         '''
10          For迴圈中執行步驟:
11         (1)迴圈遍歷,將500個子程序新增到程序池(相對父程序會阻塞)
12         (2)每次執行10個子程序,等一個子程序執行完後,立馬啟動新的子程序。(相對父程序不阻塞)
13         apply_async為非同步程序池寫法。
14         非同步指的是啟動子程序的過程,與父程序本身的執行(print)是非同步的,而For迴圈中往程序池新增子程序的過程,與父程序本身的執行卻是同步的。
15         '''
16         pool.apply_async(test, args=(i,)) #維持執行的程序總數為10,當一個程序執行完後啟動一個新程序. 
17     print“test”
18     pool.close()
19     pool.join()

程式碼說明:

執行順序:For迴圈內執行了2個步驟,第一步:將500個物件放入程序池(阻塞)。第二步:同時執行10個子程序(非阻塞),有結束的就立即新增,維持10個子程序執行。(apply_async方法的會在執行完for迴圈的新增步驟後,直接執行後面的print語句,而apply方法會等所有程序池中的子程序執行完以後再執行後面的print語句)

注意:呼叫join之前,先呼叫close或者terminate方法,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束。

3、同步程序池(阻塞)

 1 from multiprocessing import Pool
 2 
 3 def test(p):
 4     print p
 5     time.sleep(3)
 6 
 7 if __name__=="__main__":
 8     pool = Pool(processes=10)
 9     for i in xrange(500):
10         '''
11         實際測試發現,for迴圈內部執行步驟:
12         (1)遍歷500個可迭代物件,往程序池放一個子程序
13         (2)執行這個子程序,等子程序執行完畢,再往程序池放一個子程序,再執行。(同時只執行一個子程序)
14         for迴圈執行完畢,再執行print函式。
15         '''
16         pool.apply(test, args=(i,)) #維持執行的程序總數為10,當一個程序執行完後啟動一個新程序.
17     print“test”
18     pool.close()
19     pool.join()

程式碼說明:

for迴圈內執行的步驟順序,往程序池中新增一個子程序,執行子程序,等待執行完畢再新增一個子程序…..等500個子程序都執行完了,再執行print “test”。(從結果來看,並沒有多程序併發

 

3、Queue模組

  • 實現功能:

將每個核或執行緒的運算結果放在佇列中,等到每個執行緒或核執行完畢後再從佇列中取出結果, 繼續載入運算。原因很簡單, 多執行緒呼叫的函式不能有返回值, 所以使用Queue儲存多個執行緒運算的結果

  • 例子:

 1 import multiprocessing as mp
 2 
 3 def job(q):
 4     res=0
 5     for i in range(1000):
 6         res+=i+i**2+i**3
 7     q.put(res) #queue
 8 
 9 if __name__=='__main__':
10     q = mp.Queue()
11     p1 = mp.Process(target=job,args=(q,))
12     p2 = mp.Process(target=job,args=(q,))
13     p1.start()
14     p2.start()
15     p1.join()
16     p2.join()
17     res1 = q.get()
18     res2 = q.get()
19     print(res1+res2)

 

4、Pipe模組

  • 實現功能:

用來管道操作。


5、Manager模組

  • 實現功能:

Manager模組常與Pool模組一起使用,作用是共享資源。

 

6、Lock模組(程序鎖)

  • 實現功能:

當多個程序需要訪問共享資源的時候,Lock可以用來避免訪問的衝突。

  • 實現步驟:

1)首先需要定義一個程序鎖

l = multiprocessing.Lock()    # 定義一個程序鎖

2)然後將程序鎖的資訊傳入各個程序中

p1 = multiprocessing.Process(target=job, args=(v,1,l))    # 需要將Lock傳入
p2 = multiprocessing.Process(target=job, args=(v,3,l))

3)在job()中設定程序鎖的使用,保證執行時一個程序的對鎖內內容的獨佔

  • 例子:

 1 import multiprocessing as mp
 2 
 3 def job(v, num, l):
 4     l.acquire() # 鎖住
 5     for _ in range(5):
 6         time.sleep(0.1) 
 7         v.value += num # 獲取共享記憶體
 8         print(v.value)
 9     l.release() # 釋放
10 
11 def multicore():
12     l = mp.Lock() # 定義一個程序鎖
13     v = mp.Value('i', 0) # 定義共享記憶體
14     p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入
15     p2 = mp.Process(target=job, args=(v,3,l)) 
16     p1.start()
17     p2.start()
18     p1.join()
19     p2.join()
20 
21 if __name__ == '__main__':
22     multicore()

 

共享記憶體

Multiprocessing類中共享資源可以使用3種方式,分別是Queue,Array,Manager。

1、Queue類

使用Multiprocessing.Queue類,共享資源(share memory)(只適用Process類,不能再Pool程序池中使用)

 1 from multiprocessing import Process, Queue
 2 
 3 def test(queue):
 4     queue.put("Hello World")
 5 
 6 if __name__ == '__main__':
 7     q = Queue()
 8     p = Process(target=test, args=(q,)) #需要將q物件傳遞給子程序
 9     p.start()
10     Print q.get()

2、Array、Value類

使用Multiprocessing.Array類,共享資源(share memory)(只適用於Process類,無法與Pool一起使用)

 1 from multiprocessing import Process, Array
 2 
 3 def test(a):
 4     for i in range(len(a)):
 5         a[i] = -a[i]
 6 
 7 if__name__ == '__main__':
 8     arr = Array('i', range(10))
 9     p = Process(target=test, args=(arr)) #需要將arr物件傳遞給子程序
10     p.start()
11     p.join()
12     print arr[:]
  • 單值:Value

  我們可以通過使用Value資料儲存在一個共享的記憶體表中。

1 import multiprocessing as mp 
2 value1 = mp.Value('i', 0) 
3 value2 = mp.Value('d', 3.14)
4   
5 # 其中d和i引數用來設定資料型別的,d表示一個雙精浮點型別,i表示一個帶符號的整型。
  • 列表:Array

  在Python的mutiprocessing中,有還有一個Array類,可以和共享記憶體互動,來實現在程序之間共享資料。

1 array = mp.Array('i', [1, 2, 3, 4])
2   
3 #這裡的Array和numpy中的不同,它只能是一維的,不能是多維的。同樣和Value 一樣,需要定義資料形式,否則會報錯。

3、Manager類

使用Multiprocessing.Manager類,共享資源。(可以適用Pool類)
例項目的:父程序在執行子程序的過程中,同步判斷一個公共資源值,如果滿足條件則結束所有程序。

 1 from multiprocessing import Manager
 2 
 3 def test(i,lists):
 4     print i
 5     lists.append(i)
 6 
 7 if __name__=="__main__":
 8     pool=Pool()
 9     lists=Manager().list() #Manager類例項化程式碼只能寫在main()函式裡面
10     for i in xrange(10000000):
11         if len(lists)<=0:
12             '''
13             在建立子程序時,需要將lists物件傳入,不然無法共享。
14             '''
15             pool.apply_async(test,args=(i,lists))##需要將lists物件傳遞給子程序,這裡比較耗資源,原因可能是因為Manager類是基於通訊的。
16         else:
17             break

  

  • 父程序中的全域性變數能被子程序共享嗎?

解答:不行,因為每個程序享有獨立的記憶體資料,如果想要共享資源,可以使用Manage類,或者Queue等模組。