Python 學習筆記 多程序 multiprocessing
Python 直譯器有一個全域性直譯器鎖(PIL),導致每個 Python 程序中最多同時執行一個執行緒,因此 Python 多執行緒程式並不能改善程式效能,不能發揮多核系統的優勢,可以通過這篇文章瞭解。
但是多程序程式不受此影響, Python 2.6 引入了 multiprocessing 來解決這個問題。這裡介紹 multiprocessing 模組下的程序,程序同步,程序間通訊和程序管理四個方面的內容。 這裡主要講解多程序的典型使用,multiprocessing 的 API 幾乎是完複製了 threading 的API, 因此只需花少量的時間就可以熟悉 threading 程式設計了。
Process
先來看一段程式碼
1234567891011 | from multiprocessing import Process, current_processdef func(): time.sleep(1) proc = current_process() proc.name, proc.pidsub_proc = Process(target=func, args=())sub_proc.start()sub_proc.join()proc = current_process()proc.name, proc.pid |
這是在主程序中建立子程序,然後啟動(start) 子程序,等待(join) 子程序執行完,再繼續執行主程序的整個的執行流程。
那麼,一個程序應該是用來做什麼的,它應該儲存一些什麼狀態,它的生命週期是什麼樣的呢?
一個程序需要處理一些不同任務,或者處理不同的物件。建立程序需要一個 function 和相關引數,引數可以是dictProcess(target=func,
args=(), kwargs = {})
,name
可以用來標識程序。
控制子程序進入不同階段的是 start()
, join()
, is_alive()
, terminate()
, exitcode
方法。這些方法只能在建立子程序的程序中執行。
程序同步
Lock
鎖是為了確保資料一致性,比如讀寫鎖,每個程序給一個變數增加 1 ,但是如果在一個程序讀取但還沒有寫入的時候,另外的程序也同時讀取了,並寫入該值,則最後寫入的值是錯誤的,這時候就需要鎖。
123456789 | def func(lock): lock.acquire() # do mysql query select update ... lock.release()lock = Lock()for i in xrange(4): proc = Process(target=func, args=(lock)) proc.start() |
Lock 同時也實現了 ContextManager API, 可以結合 with 語句使用, 關於 ContextManager, 請移步 Python 學習實踐筆記 裝飾器 與 context 檢視。
Semaphore
Semaphore 和 Lock 稍有不同,Semaphore 相當於 N 把鎖,獲取其中一把就可以執行了。 訊號量的總數 N 在構造時傳入,s
= Semaphore(N)
。 和 Lock 一樣,如果訊號量為0,則程序堵塞,直到訊號大於0。
Pipes
Pipe 是在兩個程序之間通訊的工具,Pipe Constructor 會返回兩個端
1 | conn1, conn2 = Pipe(True) |
如果是全雙工的(建構函式引數為True),則雙埠都可接收發送,否則前面的埠用於接收,後面的埠用於傳送。
1234567891011 | def proc1(pipe): for i in xrange(10000): pipe.send(i)def proc2(pipe): while True: print "proc2 rev:", pipe.recv()pipe = Pipe()Process(target=proc1, args=(pipe[0],)).start()Process(target=proc2, args=(pipe[1],)).start() |
Pipe 的每個埠同時最多一個程序讀寫,否則資料會出各種問題
Queues
multiprocessing.Queue 與 Queue.Queue 非常相似。其 API 列表如下
- qsize()
- empty()
- full()
- put()
- put_nowait()
- get()
- get_nowait()
- close()
- join_thread()
- cancel_join_thread()
當 Queue 為 Queue.Full 狀態時,再 put() 會堵塞,當狀態為 Queue.Empty 時,再 get() 也是。當 put() 或 get() 設定了超時引數,而超時的時候,會丟擲異常。
Queue 主要用於多個程序產生和消費,一般使用情況如下
123456789101112 | def producer(q): for i in xrange(10): q.put(i)def consumer(q): while True: print "consumer", q.get()q = Queue(40)for i in xrange(10): Process(target=producer, args=(q,)).start()Process(target=consumer, args=(q,)).start() |
十個生產者程序,一個消費者程序,共用同一個佇列進行同步。
有一個簡化版本的 multiprocessing.queues.SimpleQueue, 只支援3個方法 empty(), get(), put()。
也有一個強化版本的 JoinableQueue, 新增兩個方法 task_done() 和 join()。 task_done() 是給消費者使用的,每完成佇列中的一個任務,呼叫一次該方法。當所有的 tasks 都完成之後,交給呼叫 join() 的程序執行。
123456789101112131415 | def consumer(q): while True: print "consumer", q.get() q.task_done()jobs = JoinableQueue()for i in xrange(10): jobs.put(i)for i in xrange(10): p = Process(target=consumer, args=(jobs,)) p.daemon = True p.start()jobs.join() |
這個 join 函式等待 JoinableQueue 為空的時候,等待就結束,外面的程序可以繼續執行了,但是那10個程序幹嘛去了呢,他們還在等待呀,上面是設定了 p.daemon
= True
, 子程序才隨著主程序結束的,如果沒有設定,它們還是會一直等待的呢。
Lock、Pipe、Queue 和 Pipe 需要注意的是:儘量避免使用 Process.terminate 來終止程式,否則將會導致很多問題, 詳情請移步python 官方文件檢視。
程序間資料共享
前一節中, Pipe、Queue 都有一定資料共享的功能,但是他們會堵塞程序, 這裡介紹的兩種資料共享方式都不會堵塞程序, 而且都是多程序安全的。
共享記憶體
共享記憶體有兩個結構,一個是 Value
, 一個是 Array
,這兩個結構內部都實現了鎖機制,因此是多程序安全的。
用法如下:
1234567891011 | def func(n, a): n.value = 50 for i in range(len(a)): a[i] += 10num = Value('d', 0.0)ints= Array('i', range(10))p = Process(target=func, args=(num, ints))p.start()p.join() |
Value 和 Array 都需要設定其中存放值的型別,d 是 double 型別,i 是 int 型別,具體的對應關係在Python 標準庫的 sharedctypes 模組中檢視。
服務程序 Manager
上面的共享記憶體支援兩種結構 Value 和 Array, 這些值在主程序中管理,很分散。 Python 中還有一統天下,無所不能的 Server process,專門用來做資料共享。 其支援的型別非常多,比如list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value 和 Array 用法如下:
123456789101112131415 | from multiprocessing import Process, Managerdef func(dct, lst): dct[88] = 88 lst.reverse()manager = Manager()dct = manager.dict()lst = manager.list(range(5,10))p = Process(target=func, args=(dct, lst))p.start()p.join()print dct, '|', lstOut: {88: 88} | [9, 8, 7, 6, 5] |
一個 Manager 物件是一個服務程序,推薦多程序程式中,資料共享就用一個 manager 管理。
程序管理
如果有50個任務要執行, 但是 CPU 只有4核, 你可以建立50個程序來做這個事情。但是大可不必,徒增管理開銷。如果你只想建立4個程序,讓他們輪流替你完成任務,不用自己去管理具體的程序的建立銷燬,那 Pool 是非常有用的。
Pool 是程序池,程序池能夠管理一定的程序,當有空閒程序時,則利用空閒程序完成任務,直到所有任務完成為止,用法如下
12345 | def func(x): return x*xpool = Pool(processes=4)print pool.map(func, range(8)) |
Pool 程序池建立4個程序,不管有沒有任務,都一直在程序池中等候,等到有資料的時候就開始執行。
Pool 的 API 列表如下:
- apply(func[, args[, kwds]])
- apply_async(func[, args[, kwds[, callback]]])
- map(func, iterable[, chunksize])
- map_async(func, iterable[, chunksize[, callback]])
- imap(func, iterable[, chunksize])
- imap_unordered(func, iterable[, chunksize])
- close()
- terminate()
- join()
非同步執行
apply_async 和 map_async 執行之後立即返回,然後非同步返回結果。 使用方法如下
123456789101112 | def func(x): return x*xdef callback(x): print x, 'in callback'pool = Pool(processes=4)result = pool.map_async(func, range(8), 8, callback)print result.get(), 'in main'Out:[0, 1, 4, 9, 16, 25, 36, 49] in callback[0, 1, 4, 9, 16, 25, 36, 49] in main |
有兩個值得提到的,一個是 callback,另外一個是 multiprocessing.pool.AsyncResult。 callback 是在結果返回之前,呼叫的一個函式,這個函式必須只有一個引數,它會首先接收到結果。callback 不能有耗時操作,因為它會阻塞主執行緒。
AsyncResult 是獲取結果的物件,其 API 如下
- get([timeout])
- wait([timeout])
- ready()
- successful()
如果設定了 timeout 時間,超時會丟擲 multiprocessing.TimeoutError 異常。wait 是等待執行完成。 ready 測試是否已經完成,successful 是在確定已經 ready 的情況下,如果執行中沒有丟擲異常,則成功,如果沒有ready 就呼叫該函式,會得到一個 AssertionError 異常。
Pool 管理
這裡不再繼續講 map 的各種變體了,因為從上面的 API 一看便知。
然後我們來看看 Pool 的執行流程,有三個階段。第一、一個程序池接收很多工,然後分開執行任務;第二、不再接收任務了;第三、等所有任務完成了,回家,不幹了。
這就是上面的方法,close 停止接收新的任務,如果還有任務來,就會丟擲異常。 join 是等待所有任務完成。 join 必須要在 close 之後呼叫,否則會丟擲異常。terminate 非正常終止,記憶體不夠用時,垃圾回收器呼叫的就是這個方法。