1. 程式人生 > 其它 >Python:多程序並行程式設計與程序池

Python:多程序並行程式設計與程序池

Python的並行程式設計可以採用multiprocessingmpi4py模組來完成。

multiprocessing是Python標準庫中的模組,實現了共享記憶體機制,也就是說,可以讓執行在不同處理器核心的程序能讀取共享記憶體。在基於共享記憶體通訊的多程序程式設計中,常常通過加鎖或類似機制來實現互斥。

mpi4py庫實現了訊息傳遞的程式設計正規化(設計模式)。簡單來說,就是程序之間不靠任何共享資訊來進行通訊(也叫做shared nothing),所有的交流都通過傳遞資訊代替。在資訊傳遞的程式碼中,程序通過send()receive()進行交流(與共享記憶體通訊形成對比)。

本文先介紹multiprocessing

模組,mpi4py留在下一篇部落格介紹。

1 Unix程序模型回顧

Python的multiprocessing程序模組在Unix上是基於fork()程式設計介面的(MacOS上預設是spawn()),故在介紹Python的並行程式設計模組之前,首先讓我們回顧一下Unix的程序模型。

程序的經典定義就是一個執行中程式的例項。系統中的每個程式都執行在某個程序的上下文(context)中。上下文是由程式正確執行所需的狀態組成的。這個狀態包括存放在記憶體中的程式的程式碼和資料,它的棧、通用目的暫存器的內容、程式計數器、環境變數以及開啟檔案描述符的集合。程序提供給應用程式以下的抽象:

  • 一個獨立的邏輯控制流,它提供一個假象,好像我們的程式獨佔地使用處理器。
  • 一個私有的地址空間,它提供一個假象,好像我們的程式獨佔地使用記憶體系統。

程序間進行通訊需要一些諸如共享記憶體的特殊手段,我們後文第3節部分會提到。

Unix系統上常通過以下方式建立子程序:

# include <stdlib.h>
# include <unistd.h>
# include <stdio.h>

int main(){
    pid_t pid;
    int x = 1;
    pid = fork();
    if (pid == 0){ //Child
        printf("child: x=%d\n", ++x);
        exit(0);
    }
    //Parent
    printf("parent: x=%d\n", --x);
    exit(0);
}

在Unix系統上執行這個程式時,我們得到下面的結果:

parent: x=0
child: x=2

fork()函式是一點需要注意的事它只被呼叫一次,卻會返回兩次:一次是在呼叫程序(父程序)中,一次是在新建立的子程序中。在父程序中,fork()返回子程序的PID;在子程序中,fork() 返回0。因為子程序的PID總是為非零,返回值就提供一個明確的方法來分辨程式是在父程序還是在子程序中執行。

2 用multiprocessing模組建立程序

前面我們說過,在Unix下Python的multiprocessing模組實際上是呼叫fork()系統呼叫來建立程序的,這麼坐會克隆出一個Python直譯器,在fork()時會包含所有的狀態,也即父程序的所有資源都由子程序繼承。此外,父程序既可以在產生子程序之後繼續非同步執行,也可以暫停等待子程序建立完成之後再繼續執行。Python的multiprocessing庫通過以下幾步建立程序:

  1. 建立程序物件
  2. 呼叫start()方法,開啟程序的活動
  3. 呼叫join()方法,在程序結束之前一直等待。

如下所示我們建立了3個程序:

import multiprocessing
from multiprocessing import shared_memory
import os
import time

def foo(i, data):
    data[i] = i
    print ('called function in process: %d, data[%d] = %d' % (os.getpid(), i, data[i]))
    time.sleep(10)
    return

if __name__ == '__main__':

    n_processes = 8
    data = [None for i in range(n_processes)]
    
    begin = time.time()
    Process_jobs = []
    for i in range(n_processes):
        # 每個子程序都拿了一個data副本
        p = multiprocessing.Process(target=foo, args=(i, data))
        Process_jobs.append(p)
        p.start()
 
    for i in range(n_processes):
        Process_jobs[i].join()
    end = time.time()
    
    print(data)
    print("time: %d" % (end - begin))

程式碼列印輸出:

called function in process: 39195, data[1] = 1
called function in process: 39198, data[4] = 4
called function in process: 39200, data[6] = 6
called function in process: 39197, data[3] = 3
called function in process: 39201, data[7] = 7
called function in process: 39196, data[2] = 2called function in process: 39194, data[0] = 0

called function in process: 39199, data[5] = 5
[None, None, None, None, None, None, None, None]
time: 11

可以看到子程序的列印是亂序的,說明我們不能對子程序的執行順序做出任何假設。此外,可以看到每個子程序都擁有主程序中data的副本(也即地址空間獨立),在子程序中對data進行修改並沒有影響到我們主程序data中的值。最後,我們可以看到雖然我們每個子程序都會睡眠10s,但是整個程式的執行時間是11s,說明我們子程序的執行是並行的(本機為8核CPU)。

這裡我們使用VSCode的除錯外掛,可以直觀地看到子程序的建立情況:

誒,我們不是一共生成了8個子程序嗎,為什麼顯示有9個子程序嗎?這是因為我是在MacOS上測試的,而在Python3.8版本之後: 對於 MacOS,spawn()啟動方式是預設方式。 因為 fork()可能導致subprocess崩潰。若以spawn()方式啟動多程序會同時啟動一個資源追蹤程序,負責追蹤當前程式的程序產生的、並且不再被使用的命名系統資源(如命名訊號量以及SharedMemory物件)。當所有程序退出後,資源追蹤會負責釋放這些仍被追蹤的的物件。通常情況下是不會有這種物件的,但是假如一個子程序被某個訊號殺死,就可能存在這一類資源的“洩露”情況。

3 共享記憶體

程序有獨立的地址空間既是優點也是缺點。這樣一來,一個程序不可能不小心覆蓋另一個程序的虛擬記憶體,這就消除了許多令人迷惑的錯誤——這是一個明顯的優點。另一方面,獨立的地址空間使得程序共享狀態資訊變得更加困難。為了共享資訊,它們必須使用顯式的IPC(程序間通訊)機制。基於程序的設計的另一個缺點是,它們往往比較慢,因為程序控制和IPC的開銷很高。

程序間通訊的手段有許多,包括管道、訊息佇列,系統V共享記憶體,系統V訊號量(semaphore)、socket(不同主機上)等。Python的multiprocessing採用共享記憶體進行程序程序間通訊,如下所示:

import multiprocessing
from multiprocessing import shared_memory
import os
import time

def foo(i, data):
    data[i] = i
    print ('called function in process: %d, data[%d] = %d' % (os.getpid(), i, data[i]))
    time.sleep(10)
    return

if __name__ == '__main__':

    n_processes = 8
    data = [None for i in range(n_processes)]
    # 需要置為共享記憶體
    data = shared_memory.ShareableList([999 for i in range(n_processes)])
    
    begin = time.time()
    Process_jobs = []
    for i in range(n_processes):
        # 每個子程序都拿了一個data副本
        p = multiprocessing.Process(target=foo, args=(i, data))
        Process_jobs.append(p)
        p.start()
 
    for i in range(n_processes):
        Process_jobs[i].join()
    end = time.time()

    print(data)
    print("time: %d" % (end - begin))
  
    data.shm.close()
    data.shm.unlink()

該程式碼執行結果如下:

called function in process: 43848, data[6] = 6
called function in process: 43842, data[0] = 0
called function in process: 43849, data[7] = 7
called function in process: 43843, data[1] = 1
called function in process: 43846, data[4] = 4
called function in process: 43844, data[2] = 2
called function in process: 43847, data[5] = 5
called function in process: 43845, data[3] = 3
ShareableList([0, 1, 2, 3, 4, 5, 6, 7], name='psm_c252aac3')
time: 11

可以看到8個子程序成功修改了共享記憶體中的內容。

4 終止/殺掉程序

我們可以使用terminate() 方法終止一個程序,這在Unix系統上是使用SIGTERM訊號完成的。另外,我們可以使用 is_alive()方法來判斷一個程序是否還存活。

# 殺死一個程序
import multiprocessing
import time

def foo():
    print('Starting function')
    time.sleep(0.1)
    print('Finished function')

if __name__ == '__main__':
    p = multiprocessing.Process(target=foo)
    print('Process before execution:', p, p.is_alive())
    p.start()
    print('Process running:', p, p.is_alive())
    p.terminate()
    time.sleep(0.1) # 注意,傳訊號需要時間,故sleep一下
    print('Process terminated:', p, p.is_alive())
    p.join()
    print('Process joined:', p, p.is_alive())
    print('Process exit code:', p.exitcode)

該程式碼列印:

Process before execution: <Process name='Process-1' parent=50932 initial> False
Process running: <Process name='Process-1' pid=50944 parent=50932 started> True
Process terminated: <Process name='Process-1' pid=50944 parent=50932 stopped exitcode=-SIGTERM> False
Process joined: <Process name='Process-1' pid=50944 parent=50932 stopped exitcode=-SIGTERM> False
Process exit code: -15

最後,我們通過讀程序的ExitCode狀態碼(status code)驗證程序已經結束, ExitCode可能的值如下:

  • == 0: 沒有錯誤正常退出
  • > 0: 程序有錯誤,並以此狀態碼退出
  • < 0: 程序被-1 *訊號殺死並以此作為 ExitCode 退出
    在我們的例子中,輸出的ExitCode-15 。負數表示子程序被數字為15的訊號殺死。

如果我們修改為p.kill()則為立即殺掉一個程序。則上述程式會列印輸出:

Process before execution: <Process name='Process-1' parent=52481 initial> False
Process running: <Process name='Process-1' pid=52492 parent=52481 started> True
Process terminated: <Process name='Process-1' pid=52492 parent=52481 stopped exitcode=-SIGKILL> False
Process joined: <Process name='Process-1' pid=52492 parent=52481 stopped exitcode=-SIGKILL> False
Process exit code: -9

也即子程序接收到數字為9的訊號,即被殺死。終止程序和殺死程序的區別在於終止程序會先將資料由記憶體寫入磁碟(類似於關機),但殺掉程序則不會儲存資料(類似於直接給計算機斷掉電源)。

Unix系統上常見訊號查詢手冊如下:

最後,還需要強調一點,Python雖然提供了用訊號殺掉程序的函式,但是並不提供用訊號直接殺掉執行緒的函式,這是因為殺掉程序遠比殺掉執行緒安全,具體原因可以參見知乎問題《為什麼大多數語言的標準庫都不提供或不鼓勵使用“殺執行緒”的功能?》

5 程序間同步

多個程序可以協同工作來完成一項任務,而這通常需要共享資料。所以在多程序之間保持資料的一致性就很重要了。需要共享資料協同的程序必須以適當的策略來讀寫資料。相關的同步原語和執行緒的庫很類似。

程序的同步原語如下:

Lock: 這個物件可以有兩種狀態:鎖住的(locked)和沒鎖住的(unlocked)。一個Lock物件有兩個方法, acquire()release() ,來控制共享資料的讀寫許可權。
Event: 實現了程序間的簡單通訊,一個程序發事件的訊號,另一個程序等待事件的訊號。 Event物件有兩個方法, set()clear(),來管理自己內部的變數。
Condition: 此物件用來同步部分工作流程,在並行的程序中,有兩個基本的方法: wait()用來等待程序, notify_all()用來通知所有等待此條件的程序。
Semaphore: 用來共享資源,例如,支援固定數量的共享連線。
Rlock: 遞迴鎖物件。其用途和方法同Threading模組一樣。
Barrier: 將程式分成幾個階段,適用於有些程序必須在某些特定程序之後執行。處於障礙(Barrier)之後的程式碼不能同處於障礙之前的程式碼並行。

下列程式碼展示瞭如何使用Barrier來同步兩個程序。我們有4個程序,程序1和2由Barrier管理,而程序3和4則沒有同步策略。

import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
    name = multiprocessing.current_process().name
    synchronizer.wait()
    now = time()
    with serializer:
        print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))

def test_without_barrier():
    name = multiprocessing.current_process().name
    now = time()
    print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))

if __name__ == '__main__':
    synchronizer = Barrier(2) # Barrier宣告的引數代表要管理的程序總數:
    serializer = Lock()
    Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
    Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
    Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
    Process(name='p4 - test_without_barrier', target=test_without_barrier

以上程式碼執行結果如下:

process p3 - test_without_barrier ----> 2022-12-09 20:09:18.382580
process p1 - test_with_barrier ----> 2022-12-09 20:09:18.432348
process p2 - test_with_barrier ----> 2022-12-09 20:09:18.432586
process p4 - test_without_barrier ----> 2022-12-09 20:09:18.464562

可以看到with_barrier的程序1和程序2比without_barrier的程序3和4時間差的小很多。
關於用Barrier同步兩個程序的過程,可描述如下圖:

6 使用程序池

下面的例子展示瞭如果通過程序池來執行一個並行應用。我們建立了有8個程序的程序池,然後使用pool.map()方法來進行一個簡單的平行計算(這裡取樣了函數語言程式設計中的map-reduce思想),並與序列的map()方法進行對比。

import time
import multiprocessing

def function_square(x):
    return x * x

if __name__ ==  '__main__':
    data = [i for i in range(1000000)]

    # Parallel implementation
    begin = time.time()
    pool = multiprocessing.Pool(processes=8)
    results1 = pool.map(function_square, data)
    pool.close()
    pool.join()
    end = time.time()
    print("Parallel time: %f" % (end - begin))
    
    
    # Nonparallel code
    begin = time.time()
    results2 = map(function_square, data)
    end = time.time()
    print("Nonparallel time: %f" % (end-begin))

    if list(results1) == list(results2):
        print("Correct!")
        print(results1[:10])

上述程式碼執行結果如下:

Parallel time: 1.638756
Nonparallel time: 0.000003
Correct!
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

可見序列和並行程式碼執行結果相同,但是居然並行的時間更慢!這是由於我們計算的主要時間耗費在程序間通訊上了(主程序需要將列表的各個部分拷貝到子程序),而每個子程序的計算量又很小,是典型的計算開銷小於通訊的情況,故不能取得理性的(8倍)的加速比。這告訴我們在使用Python多程序並行程式設計時要完成的工作規模必須足夠大,這樣可以彌補額外產生的通訊開銷。

這裡需要說明一下使用Python程序池進行並行化處理時需要考慮的重要因素:

  • 這種並行化處理技術只適用於可以將問題分解成各個獨立部分的情況。
  • 任務必須定義成普通的函式來提交。例項方法、閉包或者其他型別的可呼叫物件都是不支援並行處理的。
  • 函式的引數和返回值必須課兼容於pickle編碼。任務的執行是在單獨的直譯器中完成的,這中間需要用到程序間通訊。因此,在不同的直譯器間交換資料必須要進行序列化處理(這一點尤其重要,在實際專案中我發現Pytorch的sparse_coo_tensor(型別就無法進行序列化),在呼叫p.start()時會報錯NotImplementedError: Cannot access storage of SparseTensorImpl

7 用多程序來規避GIL

多程序程式設計的一個好處就是可以用來規避GIL的限制。比如假設我們有以下執行緒程式碼:

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = some_work(args)
        ...

我們可以採用如下方法將程式碼修改為使用程序池的方式:

import os

# Processing pool (see below for initiazation)
pool = None

#  Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the  above function
def some_thread():
    while True:
        ...
        r = pool.apply(some_work, (args))
        ...
        
# Initiaze the pool
if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool()

在上面這段程式碼中,每當有執行緒要執行CPU密集型的任務時,它就把任務提交到池中,然後程序池將任務轉交給執行在另一個程序中的Python直譯器。當執行緒等待結果的時候就會釋放GIL。此外,由於計算是在另一個單獨的直譯器中進行的,這樣就不再受到GIL的限制了。在多核系統上,將會發現採用這種技術能輕易地利用到所有的CPU核心。

參考