1. 程式人生 > 實用技巧 >Python並行系統工具_multiprocessing模組

Python並行系統工具_multiprocessing模組

python的multiporcessing模組允許指令碼通過與threading模組非常類似的API來 派生程序

  • 與底層的程序分支不同,在Unix和Windows平臺上可以正常工作

  • 支援一個基本上與平臺無關的程序派生模型,並為相關目標如IPC提供工具,如鎖,管道,佇列等

  • 因為使用程序而非執行緒來並行地執行程式碼,有效的避開了執行緒GIL帶來的限制

  • multiprocess模組允許程式設計師既能發揮多處理器的威力來完成並行任務,又能保留執行緒模型帶來的大部分的簡易性和可移植性

    • 與原始的程序分支相比,跨平臺可移植性及強大的IPC工具

    • 與執行緒相比,從本質上將,付出了一些潛在的,依賴於系統平臺的任務啟動額外耗時,但是獲得了在多核或多CPU機器上真正地並行執行任務的能力

  • 執行緒沒有限制及由某些功能加強造成的不便

    • 因為物件的複製跨越程序界限執行緒中的共享可變狀態不再正常工作

      • 一個程序中的改變一般不會為其他程序所注意

      • 自由地共享狀態可能是執行緒的最大賣點

      • 在模組中,這一點缺失對其在某些使用執行緒的情景中的應用可能有所限制

    • 模組要求在Windows平臺下程序以及某些IPC工具能夠進行Pickle操作,某些編碼正規化可能實現起來較為複雜,或者不能跨平臺移植,尤其當它們使用了繫結物件方法或者向派生的程序中傳入套接字等不能Pickle的物件

      • lambda編碼模式可在threading模組中使用,但是Windows平臺下的這個模組中,不能作為程序的目標可呼叫物件,因為不能程序Pickle操作

      • pickle操作在接受程序中產生一個物件副本,而並非對原始物件的引用

        • 對於由pickle後傳給新程序的方法複製了一個可變訊息快取,更新其狀態對其原始物件沒有效果

      • Unix下分支本質上也是複製整個程序

      • 在Windows下,程序的引數的可Pickle特性要求可以在其他情境下限制multiprocessing的使用

  • mulitprocessing模組基於單獨的程序,可能最適用於相互獨立,不能自由共享可變物件狀態,並且能夠利用這個模組提供的訊息傳遞和共享記憶體工具的任務

  • 這個模組的大部分介面被設計成了類似threadingqueue模組,如Process類似Thread

    ,允許啟動一個與呼叫者指令碼並行的函式呼叫,在Windows平臺上啟動新的直譯器,Unix啟動分支新程序

    • Process物件傳入一個帶有引數的target

    • 也可以建立其子類來重新定義run行為方法

      • start方法在一個新程序中呼叫其run方法

      • 預設的run方法僅僅呼叫傳入的目標函式

    • join等待子程序的退出,提供了為多種程序同步化工具lock物件

"""
multiprocessing模組基本操作
Process類似threading.Thread
不過在並行程序而非執行緒中允許函式呼叫
可以用鎖程序同步化,如列印操作
在Windows平臺上啟動新的直譯器,Unix啟動分支新程序
"""
import os
import time
from multiprocessing import Process, Lock


def whoaim(label, lock):
    msg = '%s: name: %s, pid:%s, time: %s'
    with lock:
        print(msg % (label, __name__, os.getpid(), time.time()))


if __name__ == '__main__':
    lock = Lock()
    print("PID:", os.getpid())
    whoaim('function call', lock)

    p = Process(target=whoaim, args=('sqaured child', lock))
    p.start()
    p.join()

    for i in range(5):
        Process(target=whoaim, args=(('run process %s' % i), lock)).start()

    with lock:
        print("Main Process exit ....")
multiprocessing模組基本操作

執行步驟

  1. 指令碼執行時首先在程序中直接呼叫函式,在同一個程序中,PID一致

  2. 在一個新的程序中啟動該函式並等待其退出

  3. 最後在一個迴圈中派生五個並行的函式呼叫程序

實現方式

  • Unix下,分支一個新的子程序並在其中呼叫Process物件的run方法

    • 在Unix下,雖然子程序可以使用父程序中建立的共享全域性物件

    • 更好的方法:將物件作為引數傳入子程序的構造器

      • 既可以具有向Windows的移植性

      • 如果這種物件是父程序收集的垃圾的化,還能避免一些潛在的問題

  • Windows下,通過Windows特有的程序建立工具來派生一個新的直譯器通過管道向新程序傳入pickle後的Process物件,並在新的程序中執行python -c命令列,後者執行這個包裡的一個特殊的Python編碼的函式來讀取和unpickle這個Process物件並呼叫其run方法

    • Windows下,主程序的業務邏輯通常巢狀在if __name__ == '__main__'的測試中,這樣就可以由一個新的直譯器自由地載入而沒有副作用

    • Windows下當子程序訪問全域性物件時,後者的值可能與其在父程序中的起始時間不同,因為它們的模組將被載入一個新的程序

    • Windows下,Process接受的所有引數必須能夠進行pickle操作

      • 包含target,目標應該是可以pickle的簡單函式而不能是繫結或者非繫結物件的方法,也不能是lambda語句建立的函式

      • 基本所有的物件都可以進行pickle只是函式和類這些可呼叫物件必須是可以載入的

      • 它們僅僅通過名稱進行pickle,之後還需要載入以重新建立位元組碼

      • Windows下,帶有系統狀態的物件,如套接字,一般來講是不能做為目標引數,因為不能Pickle

    • 定製的Process子類在Windows下也必須是可pickle的,還包括它們的屬性

IPC工具

multiprocessing模組建立的程序可以通過一般的系統級別工具進行通訊,如套接字和FIFO,還為派生的程序提供了可跨平臺移植的訊息傳遞工具

  • 模組的pipe物件

    • Process物件類似

    • 提供了一個連線倆個程序的匿名管道

    • 呼叫後,返回兩個Connection物件, 表示管道的兩端管道預設是雙向的,可以傳送和接受任何可pickle的Python物件

    • 在Unix下,在內部由一對連線上的套接字os.pipe呼叫得以實現

    • 在Windows下,由平臺特異的具名管道是實現

  • Value/Array物件

    • 實現了共享的程序/執行緒安全的記憶體以用於進行程序間通訊

    • 返回基於ctype模組並在共享記憶體中建立的標量陣列物件

    • 預設帶有訪問同步化設定

  • Queue模組

    • 可以作為Python物件的一個先進先出的列表

    • 允許多個生產者和消費者

    • 佇列是一個管道加上用來協調隨機訪問的鎖機制,並繼承了Pipe加上pickle的限制

這些工具可以在數個程序間安全適用,經常以他們為通訊的同步點並因此取代了鎖之類的更貼近底層的工具

  • 限制:管道(間接包含佇列)pickle它們傳遞的物件,使得在接收端程序裡重新,不支援不能pickle的物件,傳輸的物件能被pickle,實際上它在接收端程序類被複制了對可變物件狀態的原位更改不會為傳送者中的副本感知, 狀態都不能執行緒模型中那樣自由的共享

"""
使用多程序匿名管道進行通訊
返回兩個Connection物件分別表示管道的來兩端
物件從一端傳送,在另一端接受,管道預設是雙向的
"""
import os
import time
from multiprocessing import Pipe, Process


def sender(pipe):
    print(os.getpid(), "傳送資料")
    time.sleep(10)
    """在匿名管道上向父程序傳送物件"""
    pipe.send(['sapm'] + [42, 'eggs'])
    pipe.close()


def talker(pipe):
    """通過管道傳送和接受物件"""
    pipe.send(dict(name="Bob", spam=42))
    reply = pipe.recv()
    print("talker got:", reply, os.getpid())


if __name__ == '__main__':
    print(os.getpid())
    (parentEnd, childEnd) = Pipe()
    # 派生帶管道的子程序
    Process(target=sender, args=(childEnd,)).start()
    # 從子程序中接受
    print("patent got: ", parentEnd.recv())
    # 關閉埠
    parentEnd.close()

    time.sleep(60)
    (parentEnd, childEnd) = Pipe()
    # 從子程序接受
    child = Process(target=talker, args=(childEnd,))
    child.start()
    print("parent got:", parentEnd.recv())
    # 向子程序傳送
    parentEnd.send({x * 2 for x in "spam"})
    # 等待子程序退出
    child.join()
    print('Parent exit ...')
使用多程序匿名管道進行通訊

"""
使用多程序共享記憶體物件程序通訊
傳輸的物件是共享的,但在windows下不共享全域性物件
"""
import os
from multiprocessing import Process, Value, Array
import time

# 每個程序各自的全域性物件,並非共享
procs = 3
count = 0


def showdata(label, val, arr):
    """在這個程序中列印資料值"""
    msg = "%-12s: pid:%4s, global:%s, value:%s, array:%s time: %s"
    print(msg % (label, os.getpid(), count, val.value, list(arr), time.time()))


def updater(val, arr):
    """通過共享記憶體程序通訊"""
    global count
    # 全域性計數器,非共享
    count += 1
    print("count:==>", count)
    # 傳入的物件是共享的
    val.value += 1
    for i in range(3):
        arr[i] += 1


if __name__ == '__main__':
    # 共享記憶體是執行緒/程序安全的
    # ctype中的而型別程式碼
    scalar = Value('i', 0)
    vector = Array('d', procs)

    # 在父程序中顯示初始值
    showdata('parent start: ', scalar, vector)
    print("---*---" * 20)

    # 派生子程序,傳入共享記憶體
    p = Process(target=showdata, args=('child:  ', scalar, vector))
    p.start()
    p.join()

    # 傳入父程序中更新過的共享記憶體,等待每次傳入結束
    # 每個子程序看到的父程序中到現在為止對args的更新(全域性變數的看不到)

    print("\n loop1 (update in parent, serial children) ...")
    for i in range(procs):
        count += 1
        scalar.value += 1
        vector[i] += 1
        p = Process(target=showdata, args=(('process %s' % i), scalar, vector))
        p.start()
        p.join()

    # 同上,不過允許子程序並行執行
    # 所有程序都看到了最近一次迭代的結果,因為共享這個物件
    print("\n loop2 (update in parent, parallel children) ...")
    ps = []
    for i in range(procs):
        count += 1
        scalar.value += 1
        vector[i] += 1
        p = Process(target=showdata, args=(('process %s' % i), scalar, vector))
        p.start()
        ps.append(p)
    for p in ps: p.join()
    showdata('parent temp', scalar, vector)
    # 共享記憶體在派生子程序中更新,等待每個更新結束
    print("\n loop3 (update in serial children) ...")
    ps = []
    for i in range(procs):
        p = Process(target=updater, args=(scalar, vector))
        p.start()
        p.join()
    showdata('parent temp', scalar, vector)

    # 同上,但是允許子程序並行更新
    print("\n loop4 (update in parallel children) ...")
    ps = []
    for i in range(procs):
        p = Process(target=updater, args=(scalar, vector))
        p.start()
        ps.append(p)
    for p in ps: p.join()
    showdata('parent temp', scalar, vector)


    

parent start: : pid:20796, global:0, value:0, array:[0.0, 0.0, 0.0] time: 1598267935.7003455
---*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*------*---
child:      : pid:14172, global:0, value:0, array:[0.0, 0.0, 0.0] time: 1598267935.8503418

 loop1 (update in parent, serial children) ...
process 0   : pid:23836, global:0, value:1, array:[1.0, 0.0, 0.0] time: 1598267936.032341
process 1   : pid:3356, global:0, value:2, array:[1.0, 1.0, 0.0] time: 1598267936.1933422
process 2   : pid:13252, global:0, value:3, array:[1.0, 1.0, 1.0] time: 1598267936.3533394

 loop2 (update in parent, parallel children) ...
process 0   : pid:14260, global:0, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.5973456
process 1   : pid:6936, global:0, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.6063423
process 2   : pid:8776, global:0, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.6143434
parent temp : pid:20796, global:6, value:6, array:[2.0, 2.0, 2.0] time: 1598267936.6333387

 loop3 (update in serial children) ...
count:==> 1
count:==> 1
count:==> 1
parent temp : pid:20796, global:6, value:9, array:[5.0, 5.0, 5.0] time: 1598267937.2193384

 loop4 (update in parallel children) ...
count:==> 1
count:==> 1
count:==> 1
parent temp : pid:20796, global:6, value:12, array:[8.0, 8.0, 8.0] time: 1598267937.4793396

Process finished with exit code 0
使用多程序共享記憶體物件程序通訊
  • 注意:全域性變數值的變化在Windows下的派生程序中不被共享,而ValueArray則被共享

  • 和執行緒不同,全域性變數在windows下的每個進行都有一個副本的資料

  • 在Unix下,父程序中的共享物件可以被子程序共享,但是隻是單向的,子程序對資料的更改不能返回到父程序

佇列和子類

multiprocessing有以下特性

  • 允許模組的Process建立子類,並提供架構和狀態保留(類似Thread)

  • 提供程序安全的Queue物件,可以在任意數量的程序間共享,滿足更廣泛的通訊需求(類似queue.Queue)

佇列支援更靈活的多重伺服器/客戶端模型

"""
建立Process類的子類,類似threading.Thread
Queue類似queue.Queue,不過不是執行緒間的工具,而是程序間的工具
"""
import os, time, queue
from multiprocessing import Process,Queue

# 程序安全的共享佇列,是 管道+鎖/訊號機制

class Counter(Process):
    label = "@" # 為執行中的用處保留狀態
    def __init__(self, start, queue):
        self.state = start
        self.post = queue
        Process.__init__(self)

    def run(self):
        """新程序中呼叫start()開始執行"""
        for i in range(3):
            time.sleep(1)
            self.state += 1
            print(self.label, self.pid, self.state)
            # stdout檔案為所有程序共享
            self.post.put([self.pid, self.state])
        print(self.label, self.pid, '-')

if __name__ == '__main__':
    print('start ...', os.getpid())
    expected = 9

    post = Queue()
    # 開始共享佇列的3個程序,是生產者
    p = Counter(0, post)
    q = Counter(100, post)
    r = Counter(1000, post)
    p.start(); q.start();r.start()
    while expected:
        # 父程序消耗佇列中資料
        time.sleep(0.5)
        try:
            data = post.get(block=False)
        except queue.Empty:
            print("no data ...")
        else:
            print('Posted:', data)
            expected -= 1
    p.join()
    q.join()
    r.join()

    print('finish ...', os.getpid(), r.exitcode)
佇列和子類

獨立程式

獨立程式一般系統全域性工具,如套接字和FIFO檔案,來進行通訊

multiprocessing派生的進行也可以使用這些工具,但是它們之間較緊密的關係,使得它們可以使用這個模組提供的額外的IPC通訊手段

multiprocessing的設計目的是為並行執行函式呼叫而服務的,而不是直接啟動完全不同的程式

如果某個程式的啟動可能阻塞其呼叫者,派生程式可以使用os.systemos.popensubprocess等工具

但是在其他情況下,開始一個程序來啟動某個程式並沒有什麼意義

啟動獨立程式的方法

  • Unix下,os.forkexec組合

  • os.system,os.popensubprocess可跨平臺移植的shell命令列啟動器

  • multiprocessing模組選項

  • os.spawn家族函式

    os.spawn家族函式

    os.spawnvos.spawnve呼叫的出現是為了在windows下啟動程式,和Unix下的os.forkexec組合呼叫類似,但是也可以在Unix平臺下使用,且添加了一些功能,接近os.exec的作用

    OS.spawn函式家族在新程序中執行命令列指定的程式,基本操作方面,類似os.forkexec組合呼叫並且可以代替我們之前學到的system和open呼叫

    不去真的複製呼叫它們的程序,共享描述符不起作用,可以用來啟動一個完全獨立於呼叫者而執行的程式

    • 目前的subprocessmultiprocessing模組都提供了命令列派生程式的具有可移植的替代方案

    • 除非os.spawn呼叫提供不可獲取的獨特行為,一般使用更具有可移植性的multiprocessing模組代替

"""
啟動10個並行執行的程式
在windows下用spawn啟動程式
使用P_OVERLAY程序替換,使用P_DETACH子程序stdout不指向任何地方
"""

import os
import sys

for i in range(10):
    if sys.platform[:3] == "win":
        pypath = sys.executable
        os.spawnv(os.P_NOWAIT, pypath, ('python', 'child.py', str(i)))
    else:
        pid = os.fork()
        if pid != 0:
            print("Process %d spawned " % pid)
        else:
            os.execlp('python', 'python', 'child.py', str(i))
print("Main process exit ...")
在windows下用spawn啟動程式

os.startfile呼叫

  • os.system呼叫可用來啟動一個DOS的start命令(基於一個檔案的Windows檔名關聯),獨立地開啟這個檔案,就像單擊開啟一樣,在Python中os.startfile將這個操作變得更加簡單,而且可以避免阻塞呼叫者

  • DOS的start命令:就類似在執行對話方塊中輸入命令一樣

    • 如果是一個檔名,就開啟檔案,類似在資源管理器中單機一樣

  • os.startfile不提供等待應用關閉的選項也不提供應用程式退出狀態