1. 程式人生 > >python mutilprocessing多程序程式設計

python mutilprocessing多程序程式設計

  `為了更好的理解本文內容,請務必先了解Synchronization、Asynchronization、Concurrent、Mutex等基本概念 

  multiprocessing是一個類似於Threading模組的由API產生程序的包,關於Threading模組可以參考我的部落格文章。multiprocessing能夠 提供本地和遠端兩種併發模式,通過使用子程序而不是執行緒有效地避開了GIL。因此,multiprocessing允許程式設計師充分利用機器上的多個處理器,且該包支援在Unix系統和Windows系統上執行。mutilprocessing還引入了在Threading模組中沒有相類似的API。比如Pool物件,Pool物件提供了一種方便的方法,可以跨多個輸入值並行化函式的執行,跨程序分配輸入資料(資料並行)。使用方法可以看看下面的例子:

  mutilprocessing還引入了在Threading模組中沒有相類似的API。比如Pool物件,Pool物件提供了一種方便的方法,可以跨多個輸入值並行化函式的執行,跨程序分配輸入資料(資料並行)。使用方法可以看看下面的例子:

from multiprocessing import Pool

def f(x):
    return x * x


if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3, 4, 5, 6, 7]))
    # [1, 4, 9, 16, 25, 36, 49]

Process類

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={},daemon=None)

group必須為None,設定該引數僅僅是為了與Threading模組保持一致

targetrun()方法呼叫的可呼叫物件

name是指程序名

daemon指示是否設定為守護程序

  • run()

      表示程序活動的方法,可在子類中重寫此方法。標準run()方法呼叫傳遞給物件建構函式的可呼叫物件作為目標引數(如果有),分別使用args和kwargs引數中的順序和關鍵字引數。

  • start()

      啟動程序的活動,每個程序物件最多隻能呼叫一次,在一個單獨的程序中呼叫物件的run()方法

  • join([timeout])

      如果可選引數timeout為None(預設值),則該方法將阻塞,直到呼叫其join()方法的程序終止。如果timeout是一個正數,它最多會阻塞timeout秒。請注意,如果方法的程序終止或方法超時,則該方法返回None。檢查程序的exitcode以確定它是否終止。

  • name

      程序名

  • is_alive()

      指示程序是否還活著

  • daemon

      daemon flag, a Boolean value, 必須在程序start之前設定

  • pid

      process ID

  • exitcode

      負值-N表示孩子被訊號N終止,預設為None,表示程序未被終止

  • authkey

      The process’s authentication key (a byte string)

  • sentinel

     系統物件的數字控制代碼,當程序結束時將變為“ready” 

  • terminate()

      終止程序,但注意子程序不會被終止,只是會成孤兒

請注意,start(),join(),is_alive(),terminate()和exitcode方法只應由建立過程物件的程序呼叫。

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True

  在multiprocessing中,通過建立Process物件然後呼叫其start()方法來生成程序,其使用方法和threading.Thread一樣。我們看下面的例子:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':  # 這句話是必要的,不可去掉
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

我們可以通過程序號來區分不同的程序:

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid(), '\n')


def f(name):
    info('function f')
    print('hello', name)


if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))  # 建立新程序
    p.start()  # 啟動程序
    p.join()

程序啟動

  根據平臺的不同,multiprocessing支援三種啟動程序的方法。這些啟動方法是:

  • spawn

    spawn

     呼叫改方法,父程序會啟動一個新的python程序,子程序只會繼承執行程序物件run()方法所需的那些資源。特別地,子程序不會繼承父程序中不必要的檔案描述符和控制代碼。與使用forkforkserver相比,使用此方法啟動程序相當慢。

    Available on Unix and Windows. The default on Windows.

  • fork

     父程序使用os.fork()來fork Python直譯器。子程序在開始時實際上與父程序相同,父程序的所有資源都由子程序繼承。請注意,安全建立多執行緒程序尚存在一定的問題。

    Available on Unix only. The default on Unix.

  • forkserver

     當程式啟動並選擇forkserverstart方法時,將啟動伺服器程序。從那時起,每當需要一個新程序時,父程序就會連線到伺服器並請求它fork一個新程序。 fork伺服器程序是單執行緒的,因此使用os.fork()是安全的。沒有不必要的資源被繼承。

    Available on Unix platforms which support passing file descriptors over Unix pipes.

      要選擇以上某一種start方法,請在主模組的if __name__ == '__ main__'子句中使用mp.set_start_method()

    並且mp.set_start_method()在一個程式中僅僅能使用一次。

    import multiprocessing as mp
    
    
    def foo(q):
        q.put('hello')
    
    
    if __name__ == '__main__':
        mp.set_start_method('spawn')
        q = mp.Queue()
        p = mp.Process(target=foo, args=(q,))
        p.start()
        print(q.get())
        p.join()
    

      或者,您可以使用get_context()來獲取上下文物件。上下文物件與多處理模組具有相同的API,並允許在同一程式中使用多個啟動方法。

    import multiprocessing as mp
    
    def foo(q):
        q.put('hello')
    
    if __name__ == '__main__':
        ctx = mp.get_context('spawn')
        q = ctx.Queue()
        p = ctx.Process(target=foo, args=(q,))
        p.start()
        print(q.get())
        p.join()
    

     注意,與一個context相關的物件可能與不同context的程序不相容。特別是,使用fork context建立的鎖不能傳遞給使用spawn或forkserver start方法啟動的程序。

程序通訊

  當使用多個程序時,通常使用訊息傳遞來進行程序之間的通訊,並避免必須使用任何synchronization primitives(如鎖)。對於傳遞訊息,可以使用Pipe(用於兩個程序之間的連線)或Queue(允許多個生產者和消費者)。

Queues

  class multiprocessing.Queue([maxsize])

   Queue實現queue.Queue的所有方法,但task_done()join()除外。Queue是程序、執行緒安全的模型

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()
Pipes

  Class multiprocessing.Pipe([duplex])

  返回一對(conn1, conn2) of Connection 物件代表pipe的兩端。如果duplex為True(預設值),則管道是雙向的;如果duplex為False,則管道是單向的:conn1只能用於接收訊息,conn2只能用於傳送訊息。Pipe()`函式返回一個由Pipe連線的連線物件,預設情況下是全雙工雙向通訊(duplex)。例如:

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

  Pipe()返回的兩個連線物件代表管道的兩端,每個連線物件都有send()和recv()方法。需要注意的是,管道中的資料可能會不一致或被破壞,如當兩個程序(或執行緒)嘗試同時讀取或寫入管道的同一端。當然,同時使用管道的不同端部的過程不存在損壞的風險。

程序共享狀態

  在進行併發程式設計時,通常最好避免使用共享狀態,但是,如果你確實需要使用某些共享資料,那麼multiprocessing提供了以下兩種方法:

Shared Memory

 可以使用Value或Array將資料儲存在共享記憶體的map(對映)中。例如,以下程式碼:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
	
    print(num.value)
    print(arr[:])
 # 3.1415927
 # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

  建立num和arr時使用的’d’和’i’引數是array module使用的型別的型別程式碼:'d’表示雙精度浮點數,'i’表示有符號整數。這些共享物件將是程序和執行緒安全的。為了更靈活地使用共享記憶體,可以使用multiprocessing.sharedctypes模組,該模組支援建立從共享記憶體分配的任意ctypes物件。但還是那句話,在進行併發程式設計時,通常最好避免使用共享狀態。

Server Process

  Manager()返回的Manager物件控制一個伺服器程序(server process),該程序儲存Python物件並允許其他程序使用代理操作它們。Manager物件支援的物件包括list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value 以及 Array。Managers提供了一種建立可在不同程序之間共享的資料的方法,包括在不同計算機上執行的程序之間通過網路共享。管理器物件控制管理共享物件的伺服器程序。其他程序可以使用代理訪問共享物件。

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)
        
#{0.25: None, 1: '1', '2': 2}
#[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Proxy

  代理是一個物件,它指的是(可能)在不同的程序中存在的共享物件。共享物件被認為是代理的指示物件。多個代理物件可能具有相同的指示物件。代理物件具有呼叫其引用物件的相應方法的方法。代理物件的一個重要特性是它們是pickable的,因此它們可以在程序之間傳遞。

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)  # l即是一個代理物件
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Connection

 &esmp;connection物件允許傳送和接收可序列化物件或字串。它們可以被認為是面向訊息的連線套接字,我們再上面介紹Pipe的時候所例項化的物件就是connection物件。

  • send(obj)

    將物件傳送到連線的另一端,應使用recv()讀取,且該物件必須是pickable的,>32 MB的物件可能會引發ValueError異常。

  • recv()

    返回從連線另一端傳送的物件。阻塞直到接收到東西。如果沒有剩餘要接收和另一端被關閉,則引發EOFError。

  • fileno()

    返回conn所使用的檔案描述符或控制代碼

  • close()

    關閉連線

  • poll([timeout])

    返回是否有可供讀取的資料,如果未指定超時,則會立即返回;如果timeout是一個數字,則指定阻止的最長時間(以秒為單位);如果timeout為None,則使用無限超時。

  • send_bytes(buffer[, offset[, size]])

    傳送位元組資料

  • recv_bytes([maxlength])

    接受位元組資料

  • recv_bytes_into(buffer[, offset])

    讀取從連線另一端傳送的位元組資料的完整訊息到buffer,並返回訊息中的位元組數。

    >>> from multiprocessing import Pipe
    >>> a, b = Pipe()
    >>> a.send([1, 'hello', None])
    >>> b.recv()
    [1, 'hello', None]
    >>> b.send_bytes(b'thank you')
    >>> a.recv_bytes()
    b'thank you'
    >>> import array
    >>> arr1 = array.array('i', range(5))
    >>> arr2 = array.array('i', [0] * 10)
    >>> a.send_bytes(arr1)
    >>> count = b.recv_bytes_into(arr2)
    >>> assert count == len(arr1) * arr1.itemsize
    >>> arr2
    array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
    
summary

  Server process Manager比使用共享記憶體物件更靈活,因為它們可以支援任意物件型別。此外,單個管理器可以通過網路在不同計算機上的程序共享。但它比使用共享記憶體慢。

Synchronization

  同步原語和Threading模組幾乎一致,具體請參考Python Threading 多執行緒程式設計

Lock
from multiprocessing import Process, Lock

def f(l, i):
    """
    保證同一時間只有一個標準輸出流
    """
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

# output
hello world 1
hello world 0
hello world 2
hello world 4
hello world 3
hello world 6
hello world 9
hello world 5
hello world 8
hello world 7

Pool類

  Pool類用於建立程序池

主要方法有,具體例子見程式碼,並請注意,pool物件的方法只能由建立它的程序使用:

  • pool.map()
  • pool.imap() Equivalent of map() – can be MUCH slower than Pool.map().
  • pool.starmap() Like map() method but the elements of the iterable are expected to be iterables as well and will be unpacked as arguments.
  • pool.starmap_async Asynchronous version of starmap() method
  • pool.map_async Asynchronous version of map() method.
  • pool.imap_unordered()
  • pool.apply()
  • pool.apply_async() Asynchronous version of apply() method.

from multiprocessing import Pool, TimeoutError
import time
import os


def f(x):
    return x*x


if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i, end='\t')
        print()
        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")
    
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 0	1	4	9	16	25	36	49	64	81	
# 400
# 2696
# [2696, 2696, 2696, 2696]
# We lacked patience and got a multiprocessing.TimeoutError
# For the moment, the pool remains available for more work
# Now the pool is closed and no longer available

Miscellaneous

  • multiprocessing.active_children()

    返回當前程序的所有活子進 程的列表

  • multiprocessing.cpu_count()

    返回系統中的CPU數量,此數字不等於當前程序可以使用的CPU數量。可以使用len(os.sched_getaffinity(0))獲得可用CPU的數量

  • multiprocessing.current_process()

    返回與當前程序對應的Process物件

  • multiprocessing.freeze_support()

    為程式打包成exe可執行檔案提供支援,在Windows以外的任何作業系統上呼叫時,呼叫freeze_support()無效。此外,如果模組由Windows上的Python直譯器正常執行(程式尚未凍結),則freeze_support()無效

    from multiprocessing import Process, freeze_support
    
    def f():
        print('hello world!')
    
    if __name__ == '__main__':
        freeze_support()
        Process(target=f).start()
    
  • multiprocessing.get_all_start_methods()

    返回支援的start方法列表,第一個是預設方法。可能的啟動方法是’fork’,‘spawn’和’forkserver’。在Windows上只有“spawn”可用。在Unix上’fork’和’spawn’總是受支援,'fork’是預設值。

  • multiprocessing.get_context(method=None)

    返回與multiprocessing模組具有相同屬性的上下文物件,具體用法前面已經有過例子

  • multiprocessing.get_start_method(allow_none=False)

    返回用於啟動程序的start方法的名稱,返回值可以是’fork’,‘spawn’,'forkserver’或None。 'fork’是Unix上的預設值,而’spawn’是Windows上的預設值。

  • multiprocessing.set_executable()

    設定啟動子程序時要使用的Python直譯器的路徑

  • multiprocessing.set_start_method(method)

    設定用於啟動子程序的方法。方法可以是’fork’,‘spawn’或’forkserver’。請注意,改法最多呼叫一次,並且應該寫在主模組的if name ==’__ main__'子句中。