程序與執行緒(2)- python實現多程序
python 實現多程序
參考連結: https://morvanzhou.github.io/tutorials/python-basic/multiprocessing/
python中實現多程序的模組:multiprocessing
注意:在windows系統下,要想啟動一個子程序,必須把程序相關的內容寫在”if __name__ == “__main__” ”,這句話下面。
具體實現模組
1、Process模組
-
實現功能:
建立子程序
-
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 執行緒組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 程序名;
args/kwargs: 要傳入方法的引數。
-
例項方法:
is_alive():返回程序是否在執行。
join([timeout]):阻塞當前上下文環境的程序程,直到呼叫此方法的程序終止或到達指定的timeout(可選引數)。
start():程序準備就緒,等待CPU排程。
run():strat()呼叫run方法,如果例項程序時未制定傳入target,這star執行t預設run()方法。
terminate():不管任務是否完成,立即停止工作程序。
-
屬性:
authkey
daemon:和執行緒的setDeamon功能一樣(將父程序設定為守護程序,當父程序結束時,子程序也結束)。
exitcode(程序在執行時為None、如果為–N,表示被訊號N結束)。
name:程序名字。
pid:程序號。
-
例子:
1 import multiprocessing 2 3 def job(a,d): 4 print('aaaaa') 5 6 if __name__ == “__main__”: 7 p1 = multiprocessing.Process(target=job,args=(1,2)) 8 p1.start()9 p1.join()
2、Pool模組
-
實現功能:
建立管理程序池。提供指定數量的程序供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來執行它。在共享資源時,只能使用Multiprocessing.Manager類,而不能使用Queue或者Array。
-
構造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes :使用的工作程序的數量,如果processes是None那麼使用 os.cpu_count()返回的數量。(Pool預設大小是CPU的核數,我們也可以通過在Pool中傳入processes引數即可自定義需要的核數量)
initializer: 如果initializer是None,那麼每一個工作程序在開始的時候會呼叫initializer(*initargs)。
maxtasksperchild:工作程序退出之前可以完成的任務數,完成後用一個新的工作程序來替代原程序,來讓閒置的資源被釋放。maxtasksperchild預設是None,意味著只要Pool存在工作程序就會一直存活。
context: 用在制定工作程序啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context物件的Pool()方法來建立一個池,兩種方法都適當的設定了context。
-
例項方法:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞。
apply(func[, args[, kwds]])是阻塞的。
close() 關閉pool,使其不在接受新的任務。
terminate() 關閉pool,結束工作程序,不在處理未完成的任務。
join() 主程序阻塞,等待子程序的退出, join方法要在close或terminate之後使用。
-
Pool使用方法:
1、Pool+map函式
- 用map()獲取結果,在map()中需要放入函式和需要迭代運算的值,然後它會自動分配給CPU核,返回結果。
- 說明:此寫法缺點在於只能通過map向函式傳遞一個引數。
1 from multiprocessing import Pool 2 3 def test(i): 4 print i 5 6 if __name__=="__main__": 7 lists=[1,2,3] 8 pool=Pool(processes=2) #定義最大的程序數 9 pool.map(test,lists) #lists必須是一個可迭代變數。 10 pool.close() 11 pool.join()
2、非同步程序池(非阻塞)
- apply_async()只能放入一組引數,並返回一個結果,如果想得到map()的效果需要通過迭代。
1 from multiprocessing import Pool 2 3 def test(i): 4 print i 5 6 if __name__=="__main__": 7 pool = Pool(processes=10) 8 for i in xrange(500): 9 ''' 10 For迴圈中執行步驟: 11 (1)迴圈遍歷,將500個子程序新增到程序池(相對父程序會阻塞) 12 (2)每次執行10個子程序,等一個子程序執行完後,立馬啟動新的子程序。(相對父程序不阻塞) 13 apply_async為非同步程序池寫法。 14 非同步指的是啟動子程序的過程,與父程序本身的執行(print)是非同步的,而For迴圈中往程序池新增子程序的過程,與父程序本身的執行卻是同步的。 15 ''' 16 pool.apply_async(test, args=(i,)) #維持執行的程序總數為10,當一個程序執行完後啟動一個新程序. 17 print“test” 18 pool.close() 19 pool.join()
程式碼說明:
執行順序:For迴圈內執行了2個步驟,第一步:將500個物件放入程序池(阻塞)。第二步:同時執行10個子程序(非阻塞),有結束的就立即新增,維持10個子程序執行。(apply_async方法的會在執行完for迴圈的新增步驟後,直接執行後面的print語句,而apply方法會等所有程序池中的子程序執行完以後再執行後面的print語句)
注意:呼叫join之前,先呼叫close或者terminate方法,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束。
3、同步程序池(阻塞)
1 from multiprocessing import Pool 2 3 def test(p): 4 print p 5 time.sleep(3) 6 7 if __name__=="__main__": 8 pool = Pool(processes=10) 9 for i in xrange(500): 10 ''' 11 實際測試發現,for迴圈內部執行步驟: 12 (1)遍歷500個可迭代物件,往程序池放一個子程序 13 (2)執行這個子程序,等子程序執行完畢,再往程序池放一個子程序,再執行。(同時只執行一個子程序) 14 for迴圈執行完畢,再執行print函式。 15 ''' 16 pool.apply(test, args=(i,)) #維持執行的程序總數為10,當一個程序執行完後啟動一個新程序. 17 print“test” 18 pool.close() 19 pool.join()
程式碼說明:
for迴圈內執行的步驟順序,往程序池中新增一個子程序,執行子程序,等待執行完畢再新增一個子程序…..等500個子程序都執行完了,再執行print “test”。(從結果來看,並沒有多程序併發
3、Queue模組
-
實現功能:
將每個核或執行緒的運算結果放在佇列中,等到每個執行緒或核執行完畢後再從佇列中取出結果, 繼續載入運算。原因很簡單, 多執行緒呼叫的函式不能有返回值, 所以使用Queue儲存多個執行緒運算的結果。
-
例子:
1 import multiprocessing as mp 2 3 def job(q): 4 res=0 5 for i in range(1000): 6 res+=i+i**2+i**3 7 q.put(res) #queue 8 9 if __name__=='__main__': 10 q = mp.Queue() 11 p1 = mp.Process(target=job,args=(q,)) 12 p2 = mp.Process(target=job,args=(q,)) 13 p1.start() 14 p2.start() 15 p1.join() 16 p2.join() 17 res1 = q.get() 18 res2 = q.get() 19 print(res1+res2)
4、Pipe模組
-
實現功能:
用來管道操作。
5、Manager模組
-
實現功能:
Manager模組常與Pool模組一起使用,作用是共享資源。
6、Lock模組(程序鎖)
-
實現功能:
當多個程序需要訪問共享資源的時候,Lock可以用來避免訪問的衝突。
-
實現步驟:
1)首先需要定義一個程序鎖
l = multiprocessing.Lock() # 定義一個程序鎖
2)然後將程序鎖的資訊傳入各個程序中
p1 = multiprocessing.Process(target=job, args=(v,1,l)) # 需要將Lock傳入
p2 = multiprocessing.Process(target=job, args=(v,3,l))
3)在job()中設定程序鎖的使用,保證執行時一個程序的對鎖內內容的獨佔
-
例子:
1 import multiprocessing as mp 2 3 def job(v, num, l): 4 l.acquire() # 鎖住 5 for _ in range(5): 6 time.sleep(0.1) 7 v.value += num # 獲取共享記憶體 8 print(v.value) 9 l.release() # 釋放 10 11 def multicore(): 12 l = mp.Lock() # 定義一個程序鎖 13 v = mp.Value('i', 0) # 定義共享記憶體 14 p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入 15 p2 = mp.Process(target=job, args=(v,3,l)) 16 p1.start() 17 p2.start() 18 p1.join() 19 p2.join() 20 21 if __name__ == '__main__': 22 multicore()
共享記憶體
Multiprocessing類中共享資源可以使用3種方式,分別是Queue,Array,Manager。
1、Queue類
使用Multiprocessing.Queue類,共享資源(share memory)(只適用Process類,不能再Pool程序池中使用)
1 from multiprocessing import Process, Queue 2 3 def test(queue): 4 queue.put("Hello World") 5 6 if __name__ == '__main__': 7 q = Queue() 8 p = Process(target=test, args=(q,)) #需要將q物件傳遞給子程序 9 p.start() 10 Print q.get()
2、Array、Value類
使用Multiprocessing.Array類,共享資源(share memory)(只適用於Process類,無法與Pool一起使用)
1 from multiprocessing import Process, Array 2 3 def test(a): 4 for i in range(len(a)): 5 a[i] = -a[i] 6 7 if__name__ == '__main__': 8 arr = Array('i', range(10)) 9 p = Process(target=test, args=(arr)) #需要將arr物件傳遞給子程序 10 p.start() 11 p.join() 12 print arr[:]
-
單值:Value
我們可以通過使用Value資料儲存在一個共享的記憶體表中。
1 import multiprocessing as mp 2 value1 = mp.Value('i', 0) 3 value2 = mp.Value('d', 3.14) 4 5 # 其中d和i引數用來設定資料型別的,d表示一個雙精浮點型別,i表示一個帶符號的整型。
-
列表:Array
在Python的mutiprocessing中,有還有一個Array類,可以和共享記憶體互動,來實現在程序之間共享資料。
1 array = mp.Array('i', [1, 2, 3, 4]) 2 3 #這裡的Array和numpy中的不同,它只能是一維的,不能是多維的。同樣和Value 一樣,需要定義資料形式,否則會報錯。
3、Manager類
使用Multiprocessing.Manager類,共享資源。(可以適用Pool類)
例項目的:父程序在執行子程序的過程中,同步判斷一個公共資源值,如果滿足條件則結束所有程序。
1 from multiprocessing import Manager 2 3 def test(i,lists): 4 print i 5 lists.append(i) 6 7 if __name__=="__main__": 8 pool=Pool() 9 lists=Manager().list() #Manager類例項化程式碼只能寫在main()函式裡面 10 for i in xrange(10000000): 11 if len(lists)<=0: 12 ''' 13 在建立子程序時,需要將lists物件傳入,不然無法共享。 14 ''' 15 pool.apply_async(test,args=(i,lists))##需要將lists物件傳遞給子程序,這裡比較耗資源,原因可能是因為Manager類是基於通訊的。 16 else: 17 break
- 父程序中的全域性變數能被子程序共享嗎?
解答:不行,因為每個程序享有獨立的記憶體資料,如果想要共享資源,可以使用Manage類,或者Queue等模組。