Python多程序程式設計
閱讀目錄
序. multiprocessing python中的多執行緒其實並不是真正的多執行緒,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。Python提供了非常好用的多程序包multiprocessing,只需要定義一個函式,Python會完成其他所有事情。藉助這個包,可以輕鬆完成從單程序到併發執行的轉換。multiprocessing支援子程序、通訊和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等元件。
1. Process
建立程序的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示呼叫物件,args表示呼叫物件的位置引數元組。kwargs表示呼叫物件的字典。name為別名。group實質上不使用。方法
屬性:authkey、daemon(要通過start()設定)、exitcode(程序在執行時為None、如果為–N,表示被訊號N結束)、name、pid。其中daemon是父程序終止後自動終止,且自己不能產生新程序,必須在start()之前設定。
例1.1:建立函式並將其作為單個程序
import multiprocessing import time def worker(interval): n = 5 while n > 0: print("The time is {0}".format(time.ctime())) time.sleep(interval) n -= 1 if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print "p.pid:", p.pid print "p.name:", p.name print "p.is_alive:", p.is_alive()
結果
1 2 3 4 5 6 7 8 |
21 20: 55: 24 2015
|
例1.2:建立函式並將其作為多個程序
import multiprocessing import time def worker_1(interval): print "worker_1" time.sleep(interval) print "end worker_1" def worker_2(interval): print "worker_2" time.sleep(interval) print "end worker_2" def worker_3(interval): print "worker_3" time.sleep(interval) print "end worker_3" if __name__ == "__main__": p1 = multiprocessing.Process(target = worker_1, args = (2,)) p2 = multiprocessing.Process(target = worker_2, args = (3,)) p3 = multiprocessing.Process(target = worker_3, args = (4,)) p1.start() p2.start() p3.start() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid)) print "END!!!!!!!!!!!!!!!!!"
結果
1 2 3 4 5 6 7 8 9 10 11 |
|
例1.3:將程序定義為類
import multiprocessing import time class ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = ClockProcess(3) p.start()
注:程序p呼叫start()時,自動呼叫run()
結果
1 2 3 4 5 |
|
例1.4:daemon程式對比結果
#1.4-1 不加daemon屬性
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print "end!"
結果
1 2 3 |
|
#1.4-2 加上daemon屬性
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() print "end!"
結果
1 |
|
注:因子程序設定了daemon屬性,主程序結束,它們就隨著結束了。
#1.4-3 設定daemon執行完結束的方法
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() p.join() print "end!"
結果
1 2 3 |
|
2. Lock
當多個程序需要訪問共享資源的時候,Lock可以用來避免訪問的衝突。
import multiprocessing import sys def worker_with(lock, f): with lock: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lockd acquired via with\n") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lock acquired directly\n") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.Lock() f = "file.txt" w = multiprocessing.Process(target = worker_with, args=(lock, f)) nw = multiprocessing.Process(target = worker_no_with, args=(lock, f)) w.start() nw.start() print "end"
結果(輸出檔案)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
3. Semaphore
Semaphore用來控制對共享資源的訪問數量,例如池的最大連線數。
import multiprocessing import time def worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release\n"); s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()
結果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
4. Event
Event用來實現程序間同步通訊。
import multiprocessing import time def wait_for_event(e): print("wait_for_event: starting") e.wait() print("wairt_for_event: e.is_set()->" + str(e.is_set())) def wait_for_event_timeout(e, t): print("wait_for_event_timeout:starting") e.wait(t) print("wait_for_event_timeout:e.is_set->" + str(e.is_set())) if __name__ == "__main__": e = multiprocessing.Event() w1 = multiprocessing.Process(name = "block", target = wait_for_event, args = (e,)) w2 = multiprocessing.Process(name = "non-block", target = wait_for_event_timeout, args = (e, 2)) w1.start() w2.start() time.sleep(3) e.set() print("main: event is set")
結果
1 2 3 4 5 |
|
5. Queue
Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。put方法用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩餘的空間。如果超時,會丟擲Queue.Full異常。如果blocked為False,但該Queue已滿,會立即丟擲Queue.Full異常。
get方法可以從佇列讀取並且刪除一個元素。同樣,get方法有兩個可選引數:blocked和timeout。如果blocked為True(預設值),並且timeout為正值,那麼在等待時間內沒有取到任何元素,會丟擲Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果佇列為空,則立即丟擲Queue.Empty異常。Queue的一段示例程式碼:
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print q.get(block = False) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
結果
1 |
|
6. Pipe
Pipe方法返回(conn1, conn2)代表一個管道的兩個端。Pipe方法有duplex引數,如果duplex引數為True(預設值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1只負責接受訊息,conn2只負責傳送訊息。
send和recv方法分別是傳送和接受訊息的方法。例如,在全雙工模式下,可以呼叫conn1.send傳送訊息,conn1.recv接收訊息。如果沒有訊息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會丟擲EOFError。
import multiprocessing import time def proc1(pipe): while True: for i in xrange(10000): print "send: %s" %(i) pipe.send(i) time.sleep(1) def proc2(pipe): while True: print "proc2 rev:", pipe.recv() time.sleep(1) def proc3(pipe): while True: print "PROC3 rev:", pipe.recv() time.sleep(1) if __name__ == "__main__": pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc1, args=(pipe[0],)) p2 = multiprocessing.Process(target=proc2, args=(pipe[1],)) #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],)) p1.start() p2.start() #p3.start() p1.join() p2.join() #p3.join()
結果
7. Pool
在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。當被操作物件數目不大時,可以直接利用multiprocessing中的Process動態成生多個程序,十幾個還好,但如果是上百個,上千個目標,手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。 Pool可以提供指定數量的程序,供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來它。
例7.1:使用程序池(非阻塞)
#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束 print "Sub-process(es) done."
一次執行結果
1 2 3 4 5 6 7 8 9 10 |
|
函式解釋:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
- close() 關閉pool,使其不在接受新的任務。
- terminate() 結束工作程序,不在處理未完成的任務。
- join() 主程序阻塞,等待子程序的退出, join方法要在close或terminate之後使用。
執行說明:建立一個程序池pool,並設定程序的數量為3,xrange(4)會相繼產生四個物件[0, 1, 2, 4],四個物件被提交到pool中,因pool指定程序數為3,所以0、1、2會直接送到程序中執行,當其中一個執行完事後才空出一個程序處理物件3,所以會出現輸出“msg: hello 3”出現在"end"後。因為為非阻塞,主函式會自己執行自個的,不搭理程序的執行,所以執行完for迴圈後直接輸出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程式在pool.join()處等待各個程序的結束。
例7.2:使用程序池(阻塞)
#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply(func, (msg, )) #維持執行的程序總數為processes,當一個程序執行完畢後會新增新的程序進去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() #呼叫join之前,先呼叫close函式,否則會出錯。執行完close後不會有新的程序加入到pool,join函式等待所有子程序結束 print "Sub-process(es) done."
一次執行的結果
1 2 3 4 5 6 7 8 9 10 |
|
例7.3:使用程序池,並關注結果
import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" return "done" + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(3): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print ":::", res.get() print "Sub-process(es) done."
一次執行結果
1 2 3 4 5 6 7 8 9 10 |
|
例7.4:使用多個程序池
#coding: utf-8 import multiprocessing import os, time, random def Lee(): print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當前的程序的ID start = time.time() time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數 end = time.time() print 'Task Lee, runs %0.2f seconds.' %(end - start) def Marlon(): print "\nRun task Marlon-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 40) end=time.time() print 'Task Marlon runs %0.2f seconds.' %(end - start) def Allen(): print "\nRun task Allen-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 30) end = time.time() print 'Task Allen runs %0.2f seconds.' %(end - start) def Frank(): print "\nRun task Frank-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 20) end = time.time() print 'Task Frank runs %0.2f seconds.' %(end - start) if __name__=='__main__': function_list= [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid()) pool=multiprocessing.Pool(4) for func in function_list: pool.apply_async(func) #Pool執行函式,apply執行函式,當有一個程序執行完畢後,會新增一個新的程序到pool中 print 'Waiting for all subprocesses done...' pool.close() pool.join() #呼叫join之前,一定要先呼叫close() 函式,否則會出錯, close()執行後不會有新的程序加入到pool,join函式等待素有子程序結束 print 'All subprocesses done.'
一次執行結果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
26
0
posted @ 2015-04-30 14:46 jihite 閱讀(124042) 評論(11) 編輯 收藏
評論列表
#1樓 2015-06-08 17:59
這文章不錯啊!學習了!
您好,請問為什麼我在執行您的例子1.1的時候只輸出了一條記錄: p.pid: 12812 p.name: Process-1 p.is_alive: True 是我用IDE問題嗎?我用的是wing ide,希望指教一下。
#3樓[樓主] 2015-07-29 23:10
@ BetterThinker 在windows下的話用命令開啟: D:\Python27>D:\Python27\python del.py 輸出 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ msg: hello 0 msg: hello 1 msg: hello 2 end msg: hello 3 end end end Sub-process(es) done.
#4樓 2015-10-17 11:19
mark 慢慢看
#5樓 2016-11-14 11:21
簡明扼要,轉了,多謝博主.
必須轉啊!?
#7樓 2016-11-23 16:07 用程序池pool發現一個奇怪的現象 當註釋掉time.sleep所列印的程序pid是一樣的,而使用time.sleep程序pid不一樣,請問下這是為什麼?
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
#8樓[樓主] 2016-11-28 23:56 @ _xiaocainiao 多次執行(遮蔽sleep的),會發現有時一樣,有時不一樣。 而如果把sleep換成更復雜的執行過程,如,那麼執行結果pid都不是一樣的,說明遮蔽了sleep以後,執行完了,另外一個程序才剛開始執行,正好用剛才那個程序釋放掉的程序號
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|