Python並行系統工具_multiprocessing模組
-
-
支援一個基本上與平臺無關的程序派生模型,並為相關目標如IPC提供工具,如鎖,管道,佇列等
-
因為使用程序而非執行緒來並行地執行程式碼,有效的避開了執行緒GIL帶來的限制
-
multiprocess
模組允許程式設計師既能發揮多處理器的威力來完成並行任務,又能保留執行緒模型帶來的大部分的簡易性和可移植性-
與原始的程序分支相比,跨平臺可移植性及強大的IPC工具
-
與執行緒相比,從本質上將,付出了一些潛在的,依賴於系統平臺的任務啟動額外耗時,但是獲得了在多核或多CPU機器上真正地並行執行任務的能力
-
-
執行緒沒有限制及由某些功能加強造成的不便
-
因為物件的複製跨越程序界限,執行緒中的共享可變狀態不再正常工作
-
一個程序中的改變一般不會為其他程序所注意
-
自由地共享狀態可能是執行緒的最大賣點
-
在模組中,這一點缺失對其在某些使用執行緒的情景中的應用可能有所限制
-
-
模組要求在Windows平臺下程序以及某些IPC工具能夠進行
Pickle
操作,某些編碼正規化可能實現起來較為複雜,或者不能跨平臺移植,尤其當它們使用了繫結物件方法或者向派生的程序中傳入套接字等不能Pickle的物件-
lambda
編碼模式可在threading
模組中使用,但是Windows平臺下的這個模組中,不能作為程序的目標可呼叫物件,因為不能程序Pickle操作 -
pickle
操作在接受程序中產生一個物件副本,而並非對原始物件的引用-
對於由pickle後傳給新程序的方法複製了一個可變訊息快取,更新其狀態對其原始物件沒有效果
-
-
Unix
下分支本質上也是複製整個程序 -
在Windows下,程序的引數的可Pickle特性要求可以在其他情境下限制
multiprocessing
的使用
-
-
-
mulitprocessing
模組基於單獨的程序,可能最適用於相互獨立,不能自由共享可變物件狀態,並且能夠利用這個模組提供的訊息傳遞和共享記憶體工具的任務 -
這個模組的大部分介面被設計成了類似
threading
和queue
模組,如Process
類似Thread
-
向
Process
物件傳入一個帶有引數的target
-
也可以建立其子類來重新定義
run
行為方法-
start
方法在一個新程序中呼叫其run
方法 -
預設的
run
方法僅僅呼叫傳入的目標函式
-
-
join
等待子程序的退出,提供了為多種程序同步化工具lock
""" multiprocessing模組基本操作 Process類似threading.Thread 不過在並行程序而非執行緒中允許函式呼叫 可以用鎖程序同步化,如列印操作 在Windows平臺上啟動新的直譯器,Unix啟動分支新程序 """ import os import time from multiprocessing import Process, Lock def whoaim(label, lock): msg = '%s: name: %s, pid:%s, time: %s' with lock: print(msg % (label, __name__, os.getpid(), time.time())) if __name__ == '__main__': lock = Lock() print("PID:", os.getpid()) whoaim('function call', lock) p = Process(target=whoaim, args=('sqaured child', lock)) p.start() p.join() for i in range(5): Process(target=whoaim, args=(('run process %s' % i), lock)).start() with lock: print("Main Process exit ....")multiprocessing模組基本操作
-
指令碼執行時首先在程序中直接呼叫函式,在同一個程序中,PID一致
-
在一個新的程序中啟動該函式並等待其退出
-
最後在一個迴圈中派生五個並行的函式呼叫
-
-
在Unix下,雖然子程序可以使用父程序中建立的共享全域性物件
-
更好的方法:將物件作為引數傳入子程序的構造器
-
既可以具有向
Windows
的移植性 -
如果這種物件是父程序收集的垃圾的化,還能避免一些潛在的問題
-
-
-
Windows下,通過
Windows
下特有的程序建立工具來派生一個新的直譯器,通過管道向新程序傳入pickle後的Process物件,並在新的程序中執行python -c
命令列,後者執行這個包裡的一個特殊的Python編碼的函式來讀取和unpickle這個Process物件並呼叫其run方法 -
在
Windows
下,主程序的業務邏輯通常巢狀在if __name__ == '__main__'
的測試中,這樣就可以由一個新的直譯器自由地載入而沒有副作用 -
在
Windows
下當子程序訪問全域性物件時,後者的值可能與其在父程序中的起始時間不同,因為它們的模組將被載入一個新的程序 -
在
Windows
下,Process
接受的所有引數必須能夠進行pickle
操作-
包含
target
,目標應該是可以pickle的簡單函式而不能是繫結或者非繫結物件的方法,也不能是lambda
語句建立的函式 -
基本所有的物件都可以進行
pickle
,只是函式和類這些可呼叫物件必須是可以載入的 -
它們僅僅通過名稱進行pickle,之後還需要載入以重新建立位元組碼
-
在
Windows
下,帶有系統狀態的物件,如套接字,一般來講是不能做為目標引數,因為不能Pickle
-
-
定製的
Process
子類在Windows
-
模組的
pipe
物件-
和
Process
物件類似 -
提供了一個可連線倆個程序的匿名管道
-
呼叫後,返回兩個
Connection
物件, 表示管道的兩端, 管道預設是雙向的,可以傳送和接受任何可pickle的Python物件 -
在Unix下,在內部由一對連線上的
套接字
或os.pipe
呼叫得以實現 -
在Windows下,由平臺特異的
具名管道
是實現
-
-
Value/Array物件
-
實現了共享的程序/執行緒安全的記憶體以用於進行程序間通訊
-
返回基於
ctype
模組並在共享記憶體中建立的標量和陣列物件 -
預設帶有訪問同步化設定
-
-
Queue模組
-
可以作為Python物件的一個先進先出的列表
-
允許多個生產者和消費者
-
佇列是一個管道加上用來協調隨機訪問的鎖機制,並繼承了Pipe加上pickle的限制
-
這些工具可以在數個程序間安全適用,經常以他們為通訊的同步點並因此取代了鎖之類的更貼近底層的工具
-
限制:管道(間接包含佇列)pickle它們傳遞的物件,使得在接收端程序裡重新,不支援不能pickle的物件,傳輸的物件能被pickle,實際上它在接收端程序類被複制了,對可變物件狀態的原位更改不會為傳送者中的副本感知, 狀態都不能執行緒模型中那樣自由的共享
""" 使用多程序匿名管道進行通訊 返回兩個Connection物件分別表示管道的來兩端 物件從一端傳送,在另一端接受,管道預設是雙向的 """ import os import time from multiprocessing import Pipe, Process def sender(pipe): print(os.getpid(), "傳送資料") time.sleep(10) """在匿名管道上向父程序傳送物件""" pipe.send(['sapm'] + [42, 'eggs']) pipe.close() def talker(pipe): """通過管道傳送和接受物件""" pipe.send(dict(name="Bob", spam=42)) reply = pipe.recv() print("talker got:", reply, os.getpid()) if __name__ == '__main__': print(os.getpid()) (parentEnd, childEnd) = Pipe() # 派生帶管道的子程序 Process(target=sender, args=(childEnd,)).start() # 從子程序中接受 print("patent got: ", parentEnd.recv()) # 關閉埠 parentEnd.close() time.sleep(60) (parentEnd, childEnd) = Pipe() # 從子程序接受 child = Process(target=talker, args=(childEnd,)) child.start() print("parent got:", parentEnd.recv()) # 向子程序傳送 parentEnd.send({x * 2 for x in "spam"}) # 等待子程序退出 child.join() print('Parent exit ...')使用多程序匿名管道進行通訊
""" 使用多程序共享記憶體物件程序通訊 傳輸的物件是共享的,但在windows下不共享全域性物件 """ import os from multiprocessing import Process, Value, Array import time # 每個程序各自的全域性物件,並非共享 procs = 3 count = 0 def showdata(label, val, arr): """在這個程序中列印資料值""" msg = "%-12s: pid:%4s, global:%s, value:%s, array:%s time: %s" print(msg % (label, os.getpid(), count, val.value, list(arr), time.time())) def updater(val, arr): """通過共享記憶體程序通訊""" global count # 全域性計數器,非共享 count += 1 print("count:==>", count) # 傳入的物件是共享的 val.value += 1 for i in range(3): arr[i] += 1 if __name__ == '__main__': # 共享記憶體是執行緒/程序安全的 # ctype中的而型別程式碼 scalar = Value('i', 0) vector = Array('d', procs) # 在父程序中顯示初始值 showdata('parent start: ', scalar, vector) print("---*---" * 20) # 派生子程序,傳入共享記憶體 p = Process(target=showdata, args=('child: ', scalar, vector)) p.start() p.join() # 傳入父程序中更新過的共享記憶體,等待每次傳入結束 # 每個子程序看到的父程序中到現在為止對args的更新(全域性變數的看不到) print("\n loop1 (update in parent, serial children) ...") for i in range(procs): count += 1 scalar.value += 1 vector[i] += 1 p = Process(target=showdata, args=(('process %s' % i), scalar, vector)) p.start() p.join() # 同上,不過允許子程序並行執行 # 所有程序都看到了最近一次迭代的結果,因為共享這個物件 print("\n loop2 (update in parent, parallel children) ...") ps = [] for i in range(procs): count += 1 scalar.value += 1 vector[i] += 1 p = Process(target=showdata, args=(('process %s' % i), scalar, vector)) p.start() ps.append(p) for p in ps: p.join() showdata('parent temp', scalar, vector) # 共享記憶體在派生子程序中更新,等待每個更新結束 print("\n loop3 (update in serial children) ...") ps = [] for i in range(procs): p = Process(target=updater, args=(scalar, vector)) p.start() p.join() showdata('parent temp', scalar, vector) # 同上,但是允許子程序並行更新 print("\n loop4 (update in parallel children) ...") ps = [] for i in range(procs): p = Process(target=updater, args=(scalar, vector)) p.start() ps.append(p) for p in ps: p.join() showdata('parent temp', scalar, vector) parent start: : pid:20796, global:0, value:0, array:[0.0, 0.0, 0.0] time: 1598267935.7003455 ---*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*--- child: : pid:14172, global:0, value:0, array:[0.0, 0.0, 0.0] time: 1598267935.8503418 loop1 (update in parent, serial children) ... process 0 : pid:23836, global:0, value:1, array:[1.0, 0.0, 0.0] time: 1598267936.032341 process 1 : pid:3356, global:0, value:2, array:[1.0, 1.0, 0.0] time: 1598267936.1933422 process 2 : pid:13252, global:0, value:3, array:[1.0, 1.0, 1.0] time: 1598267936.3533394 loop2 (update in parent, parallel children) ... process 0 : pid:14260, global:0, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.5973456 process 1 : pid:6936, global:0, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.6063423 process 2 : pid:8776, global:0, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.6143434 parent temp : pid:20796, global:6, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.6333387 loop3 (update in serial children) ... count:==> 1 count:==> 1 count:==> 1 parent temp : pid:20796, global:6, value:9, array:[5.0, 5.0, 5.0] time: 1598267937.2193384 loop4 (update in parallel children) ... count:==> 1 count:==> 1 count:==> 1 parent temp : pid:20796, global:6, value:12, array:[8.0, 8.0, 8.0] time: 1598267937.4793396 Process finished with exit code 0使用多程序共享記憶體物件程序通訊
-
注意:全域性變數值的變化在
Windows
下的派生程序中不被共享,而Value
和Array
則被共享 -
和執行緒不同,全域性變數在
windows
下的每個進行都有一個副本的資料 -
在Unix下,父程序中的共享物件可以被子程序共享,但是隻是單向的,子程序對資料的更改不能返回到父程序
佇列和子類
multiprocessing
有以下特性
-
允許模組的
Process
建立子類,並提供架構和狀態保留(類似Thread) -
提供程序安全的
Queue
物件,可以在任意數量的程序間共享,滿足更廣泛的通訊需求(類似queue.Queue
)
佇列支援更靈活的多重伺服器/客戶端模型
""" 建立Process類的子類,類似threading.Thread Queue類似queue.Queue,不過不是執行緒間的工具,而是程序間的工具 """ import os, time, queue from multiprocessing import Process,Queue # 程序安全的共享佇列,是 管道+鎖/訊號機制 class Counter(Process): label = "@" # 為執行中的用處保留狀態 def __init__(self, start, queue): self.state = start self.post = queue Process.__init__(self) def run(self): """新程序中呼叫start()開始執行""" for i in range(3): time.sleep(1) self.state += 1 print(self.label, self.pid, self.state) # stdout檔案為所有程序共享 self.post.put([self.pid, self.state]) print(self.label, self.pid, '-') if __name__ == '__main__': print('start ...', os.getpid()) expected = 9 post = Queue() # 開始共享佇列的3個程序,是生產者 p = Counter(0, post) q = Counter(100, post) r = Counter(1000, post) p.start(); q.start();r.start() while expected: # 父程序消耗佇列中資料 time.sleep(0.5) try: data = post.get(block=False) except queue.Empty: print("no data ...") else: print('Posted:', data) expected -= 1 p.join() q.join() r.join() print('finish ...', os.getpid(), r.exitcode)佇列和子類
獨立程式一般用系統全域性工具,如套接字和FIFO檔案,來進行通訊
multiprocessing
派生的進行也可以使用這些工具,但是它們之間較緊密的關係,使得它們可以使用這個模組提供的額外的IPC通訊手段
multiprocessing
的設計目的是為並行執行函式呼叫而服務的,而不是直接啟動完全不同的程式
如果某個程式的啟動可能阻塞其呼叫者,派生程式可以使用os.system
,os.popen
和subprocess
等工具
但是在其他情況下,開始一個程序來啟動某個程式並沒有什麼意義
啟動獨立程式的方法
-
Unix下,
os.fork
和exec
組合 -
os.system
,os.popen
,subprocess
可跨平臺移植的shell
命令列啟動器 -
multiprocessing
模組選項 -
os.spawn
家族函式os.spawn
家族函式os.spawnv
和os.spawnve
呼叫的出現是為了在windows下啟動程式,和Unix下的os.fork
和exec
組合呼叫類似,但是也可以在Unix平臺下使用,且添加了一些功能,接近os.exec
的作用OS.spawn
函式家族在新程序中執行命令列指定的程式,基本操作方面,類似os.fork
和exec
組合呼叫並且可以代替我們之前學到的system和open呼叫不去真的複製呼叫它們的程序,共享描述符不起作用,可以用來啟動一個完全獨立於呼叫者而執行的程式
-
目前的
subprocess
和multiprocessing
模組都提供了命令列派生程式的具有可移植的替代方案 -
除非
os.spawn
呼叫提供不可獲取的獨特行為,一般使用更具有可移植性的multiprocessing模組代替
""" 啟動10個並行執行的程式 在windows下用spawn啟動程式 使用P_OVERLAY程序替換,使用P_DETACH子程序stdout不指向任何地方 """ import os import sys for i in range(10): if sys.platform[:3] == "win": pypath = sys.executable os.spawnv(os.P_NOWAIT, pypath, ('python', 'child.py', str(i))) else: pid = os.fork() if pid != 0: print("Process %d spawned " % pid) else: os.execlp('python', 'python', 'child.py', str(i)) print("Main process exit ...")在windows下用spawn啟動程式
-
os.system
呼叫可用來啟動一個DOS的start命令(基於一個檔案的Windows檔名關聯),獨立地開啟這個檔案,就像單擊開啟一樣,在Python中os.startfile
將這個操作變得更加簡單,而且可以避免阻塞呼叫者 -
DOS的
start
命令:就類似在執行對話方塊中輸入命令一樣-
如果是一個檔名,就開啟檔案,類似在資源管理器中單機一樣
-
-
os.startfile
不提供等待應用關閉的選項,