python mutilprocessing多程序程式設計
`為了更好的理解本文內容,請務必先了解Synchronization、Asynchronization、Concurrent、Mutex等基本概念
multiprocessing是一個類似於Threading模組的由API產生程序的包,關於Threading模組可以參考我的部落格文章。multiprocessing能夠 提供本地和遠端兩種併發模式,通過使用子程序而不是執行緒有效地避開了GIL。因此,multiprocessing允許程式設計師充分利用機器上的多個處理器,且該包支援在Unix系統和Windows系統上執行。mutilprocessing還引入了在Threading模組中沒有相類似的API。比如Pool物件,Pool物件提供了一種方便的方法,可以跨多個輸入值並行化函式的執行,跨程序分配輸入資料(資料並行)。使用方法可以看看下面的例子:
mutilprocessing還引入了在Threading模組中沒有相類似的API。比如Pool物件,Pool物件提供了一種方便的方法,可以跨多個輸入值並行化函式的執行,跨程序分配輸入資料(資料並行)。使用方法可以看看下面的例子:
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3, 4, 5, 6, 7]))
# [1, 4, 9, 16, 25, 36, 49]
Process類
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={},daemon=None)
group
必須為None
,設定該引數僅僅是為了與Threading模組保持一致
target
是run()
方法呼叫的可呼叫物件
name
是指程序名
daemon
指示是否設定為守護程序
-
run()
表示程序活動的方法,可在子類中重寫此方法。標準run()方法呼叫傳遞給物件建構函式的可呼叫物件作為目標引數(如果有),分別使用args和kwargs引數中的順序和關鍵字引數。
-
start()
啟動程序的活動,每個程序物件最多隻能呼叫一次,在一個單獨的程序中呼叫物件的run()方法
-
join([timeout])
如果可選引數timeout為None(預設值),則該方法將阻塞,直到呼叫其join()方法的程序終止。如果timeout是一個正數,它最多會阻塞timeout秒。請注意,如果方法的程序終止或方法超時,則該方法返回None。檢查程序的exitcode以確定它是否終止。
-
name
程序名
-
is_alive()
指示程序是否還活著
-
daemon
daemon flag, a Boolean value, 必須在程序start之前設定
-
pid
process ID
-
exitcode
負值-N表示孩子被訊號N終止,預設為None,表示程序未被終止
-
authkey
The process’s authentication key (a byte string)
-
sentinel
系統物件的數字控制代碼,當程序結束時將變為“ready”
-
terminate()
終止程序,但注意子程序不會被終止,只是會成孤兒
請注意,start(),join(),is_alive(),terminate()和exitcode方法只應由建立過程物件的程序呼叫。
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
在multiprocessing中,通過建立Process
物件然後呼叫其start()
方法來生成程序,其使用方法和threading.Thread
一樣。我們看下面的例子:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__': # 這句話是必要的,不可去掉
p = Process(target=f, args=('bob',))
p.start()
p.join()
我們可以通過程序號來區分不同的程序:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid(), '\n')
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',)) # 建立新程序
p.start() # 啟動程序
p.join()
程序啟動
根據平臺的不同,multiprocessing支援三種啟動程序的方法。這些啟動方法是:
-
spawn
spawn
呼叫改方法,父程序會啟動一個新的python程序,子程序只會繼承執行程序物件
run()
方法所需的那些資源。特別地,子程序不會繼承父程序中不必要的檔案描述符和控制代碼。與使用fork
或forkserver
相比,使用此方法啟動程序相當慢。Available on Unix and Windows. The default on Windows.
-
fork
父程序使用
os.fork()
來fork Python直譯器。子程序在開始時實際上與父程序相同,父程序的所有資源都由子程序繼承。請注意,安全建立多執行緒程序尚存在一定的問題。Available on Unix only. The default on Unix.
-
forkserver
當程式啟動並選擇
forkserver
start方法時,將啟動伺服器程序。從那時起,每當需要一個新程序時,父程序就會連線到伺服器並請求它fork一個新程序。 fork伺服器程序是單執行緒的,因此使用os.fork()是安全的。沒有不必要的資源被繼承。Available on Unix platforms which support passing file descriptors over Unix pipes.
要選擇以上某一種start方法,請在主模組的
if __name__ == '__ main__'
子句中使用mp.set_start_method()
。並且
mp.set_start_method()
在一個程式中僅僅能使用一次。import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': mp.set_start_method('spawn') q = mp.Queue() p = mp.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
或者,您可以使用
get_context()
來獲取上下文物件。上下文物件與多處理模組具有相同的API,並允許在同一程式中使用多個啟動方法。import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join()
注意,與一個context相關的物件可能與不同context的程序不相容。特別是,使用fork context建立的鎖不能傳遞給使用spawn或forkserver start方法啟動的程序。
程序通訊
當使用多個程序時,通常使用訊息傳遞來進行程序之間的通訊,並避免必須使用任何synchronization primitives(如鎖)。對於傳遞訊息,可以使用Pipe(用於兩個程序之間的連線)或Queue(允許多個生產者和消費者)。
Queues
class multiprocessing.Queue
([maxsize])
Queue實現queue.Queue
的所有方法,但task_done()
和join()
除外。Queue是程序、執行緒安全的模型
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Pipes
Class multiprocessing.Pipe([duplex])
返回一對(conn1, conn2) of Connection 物件代表pipe的兩端。如果duplex為True(預設值),則管道是雙向的;如果duplex為False,則管道是單向的:conn1只能用於接收訊息,conn2只能用於傳送訊息。Pipe()`函式返回一個由Pipe連線的連線物件,預設情況下是全雙工雙向通訊(duplex)。例如:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
Pipe()返回的兩個連線物件代表管道的兩端,每個連線物件都有send()和recv()方法。需要注意的是,管道中的資料可能會不一致或被破壞,如當兩個程序(或執行緒)嘗試同時讀取或寫入管道的同一端。當然,同時使用管道的不同端部的過程不存在損壞的風險。
程序共享狀態
在進行併發程式設計時,通常最好避免使用共享狀態,但是,如果你確實需要使用某些共享資料,那麼multiprocessing提供了以下兩種方法:
Shared Memory
可以使用Value或Array將資料儲存在共享記憶體的map(對映)中。例如,以下程式碼:
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
# 3.1415927
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
建立num和arr時使用的’d’和’i’引數是array
module使用的型別的型別程式碼:'d’表示雙精度浮點數,'i’表示有符號整數。這些共享物件將是程序和執行緒安全的。為了更靈活地使用共享記憶體,可以使用multiprocessing.sharedctypes
模組,該模組支援建立從共享記憶體分配的任意ctypes物件。但還是那句話,在進行併發程式設計時,通常最好避免使用共享狀態。
Server Process
Manager()
返回的Manager物件控制一個伺服器程序(server process),該程序儲存Python物件並允許其他程序使用代理操作它們。Manager物件支援的物件包括list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
以及 Array
。Managers提供了一種建立可在不同程序之間共享的資料的方法,包括在不同計算機上執行的程序之間通過網路共享。管理器物件控制管理共享物件的伺服器程序。其他程序可以使用代理訪問共享物件。
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
#{0.25: None, 1: '1', '2': 2}
#[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Proxy
代理是一個物件,它指的是(可能)在不同的程序中存在的共享物件。共享物件被認為是代理的指示物件。多個代理物件可能具有相同的指示物件。代理物件具有呼叫其引用物件的相應方法的方法。代理物件的一個重要特性是它們是pickable的,因此它們可以在程序之間傳遞。
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l) # l即是一個代理物件
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Connection
&esmp;connection物件允許傳送和接收可序列化物件或字串。它們可以被認為是面向訊息的連線套接字,我們再上面介紹Pipe的時候所例項化的物件就是connection物件。
-
send
(obj)將物件傳送到連線的另一端,應使用
recv()
讀取,且該物件必須是pickable的,>32 MB的物件可能會引發ValueError異常。 -
recv
()返回從連線另一端傳送的物件。阻塞直到接收到東西。如果沒有剩餘要接收和另一端被關閉,則引發EOFError。
-
fileno
()返回conn所使用的檔案描述符或控制代碼
-
close
()關閉連線
-
poll
([timeout])返回是否有可供讀取的資料,如果未指定超時,則會立即返回;如果timeout是一個數字,則指定阻止的最長時間(以秒為單位);如果timeout為None,則使用無限超時。
-
send_bytes
(buffer[, offset[, size]])傳送位元組資料
-
recv_bytes
([maxlength])接受位元組資料
-
recv_bytes_into
(buffer[, offset])讀取從連線另一端傳送的位元組資料的完整訊息到buffer,並返回訊息中的位元組數。
>>> from multiprocessing import Pipe >>> a, b = Pipe() >>> a.send([1, 'hello', None]) >>> b.recv() [1, 'hello', None] >>> b.send_bytes(b'thank you') >>> a.recv_bytes() b'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) >>> a.send_bytes(arr1) >>> count = b.recv_bytes_into(arr2) >>> assert count == len(arr1) * arr1.itemsize >>> arr2 array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
summary
Server process Manager比使用共享記憶體物件更靈活,因為它們可以支援任意物件型別。此外,單個管理器可以通過網路在不同計算機上的程序共享。但它比使用共享記憶體慢。
Synchronization
同步原語和Threading模組幾乎一致,具體請參考Python Threading 多執行緒程式設計
Lock
from multiprocessing import Process, Lock
def f(l, i):
"""
保證同一時間只有一個標準輸出流
"""
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
# output
hello world 1
hello world 0
hello world 2
hello world 4
hello world 3
hello world 6
hello world 9
hello world 5
hello world 8
hello world 7
Pool類
Pool類用於建立程序池
主要方法有,具體例子見程式碼,並請注意,pool物件的方法只能由建立它的程序使用:
pool.map()
pool.imap()
Equivalent ofmap()
– can be MUCH slower thanPool.map()
.pool.starmap()
Likemap()
method but the elements of theiterable
are expected to be iterables as well and will be unpacked as arguments.pool.starmap_async
Asynchronous version ofstarmap()
methodpool.map_async
Asynchronous version ofmap()
method.pool.imap_unordered()
pool.apply()
pool.apply_async()
Asynchronous version ofapply()
method.
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i, end='\t')
print()
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 0 1 4 9 16 25 36 49 64 81
# 400
# 2696
# [2696, 2696, 2696, 2696]
# We lacked patience and got a multiprocessing.TimeoutError
# For the moment, the pool remains available for more work
# Now the pool is closed and no longer available
Miscellaneous
-
multiprocessing.active_children()
返回當前程序的所有活子進 程的列表
-
multiprocessing.cpu_count()
返回系統中的CPU數量,此數字不等於當前程序可以使用的CPU數量。可以使用
len(os.sched_getaffinity(0))
獲得可用CPU的數量 -
multiprocessing.current_process()
返回與當前程序對應的Process物件
-
multiprocessing.freeze_support()
為程式打包成exe可執行檔案提供支援,在Windows以外的任何作業系統上呼叫時,呼叫
freeze_support()
無效。此外,如果模組由Windows上的Python直譯器正常執行(程式尚未凍結),則freeze_support()
無效from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
-
multiprocessing.get_all_start_methods()
返回支援的start方法列表,第一個是預設方法。可能的啟動方法是’fork’,‘spawn’和’forkserver’。在Windows上只有“spawn”可用。在Unix上’fork’和’spawn’總是受支援,'fork’是預設值。
-
multiprocessing.get_context(method=None)
返回與multiprocessing模組具有相同屬性的上下文物件,具體用法前面已經有過例子
-
multiprocessing.get_start_method(allow_none=False)
返回用於啟動程序的start方法的名稱,返回值可以是’fork’,‘spawn’,'forkserver’或None。 'fork’是Unix上的預設值,而’spawn’是Windows上的預設值。
-
multiprocessing.set_executable()
設定啟動子程序時要使用的Python直譯器的路徑
-
multiprocessing.set_start_method(method)
設定用於啟動子程序的方法。方法可以是’fork’,‘spawn’或’forkserver’。請注意,改法最多呼叫一次,並且應該寫在主模組的if name ==’__ main__'子句中。