1. 程式人生 > >Python 的併發程式設計

Python 的併發程式設計

這篇文章將講解 Python 併發程式設計的基本操作。併發和並行是對孿生兄弟,概念經常混淆。併發是指能夠多工處理,並行則是是能夠同時多工處理。Erlang 之父 Joe Armstrong 有一張非常有趣的圖說明這兩個概念:

我個人更喜歡的一種說法是:併發是巨集觀並行而微觀序列。

GIL

雖然 Python 自帶了很好的類庫支援多執行緒/程序程式設計,但眾所周知,因為 GIL 的存在,Python 很難做好真正的並行。

GIL 指全域性直譯器鎖,對於 GIL 的介紹:

全域性直譯器鎖(英語:Global Interpreter Lock,縮寫GIL),是計算機程式設計語言直譯器用於同步執行緒的一種機制,它使得任何時刻僅有一個執行緒在執行。

  • 維基百科

其實與其說 GIL 是 Python 直譯器的限制,不如說是 CPython 的限制,因為 Python 為了保障效能,底層大多使用 C 實現的,而 CPython 的記憶體管理並不是執行緒安全的,為了保障整體的執行緒安全,直譯器便禁止多執行緒的並行執行。

因為 Python 社群認為作業系統的執行緒排程已經非常成熟了,沒有必要自己再實現一遍,因此 Python 的執行緒切換基本是依賴作業系統,在實際的使用中,對於單核 CPU,GIL 並沒有太大的影響,但對於多核 CPU 卻引入了執行緒顛簸(thrashing)問題。

執行緒顛簸是指作為單一資源的 GIL 鎖,在被多核心競爭強佔時資源額外消耗的現象。

比如下圖,執行緒1 在釋放 GIL 鎖後,作業系統喚醒了 執行緒2,並將 執行緒2 分配給 核心2 執行,但是如果此時 執行緒2 卻沒有成功獲得 GIL 鎖,只能再次被掛起。此時切換執行緒、切換上下文的資源都將白白浪費。

因此,Python 多執行緒程式在多核 CPU 機器下的效能不一定比單核高。那麼如果是計算密集型的程式,一般還是考慮用 C 重寫關鍵部分,或者使用多程序避開 GIL。

多執行緒

在 Python 中使用多執行緒,有 threadthreading 可供原則,thread 提供了低級別的、原始的執行緒以及一個簡單的鎖,因為 thread 過於簡陋,執行緒管理容易出現人為失誤,因此官方更建議使用 threading

,而 threading 也不過是對 thread 的封裝和補充。(Python3 中 thread 被改名為 _thread)。

在 Python 中建立執行緒非常簡單:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主執行緒
        t.join()
    print("Finish.")

直接建立執行緒簡單優雅,如果邏輯複雜,也可以通過繼承 Thread 基類完成多執行緒:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主執行緒
        t.join()
    print("Finish.")

多程序

在 Python 中,可以使用 multiprocessing 庫來實現多程序程式設計,和多執行緒一樣,有兩種方法可以使用多程序程式設計。

直接建立程序:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主執行緒
        t.join()
    print("Finish.")

繼承程序父類:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主執行緒
        t.join()
    print("Finish.")

multiprocessing 除了常用的多程序程式設計外,我認為它最大的意義在於提供了一套規範,在該庫下有一個 dummy 模組,即 multiprocessing.dummy,裡面對 threading 進行封裝,提供了和 multiprocessing 相同 API 的執行緒實現,換句話說,class::multiprocessing.Process 提供的是程序任務類,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,可以快速的講一個多程序程式改為多執行緒:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主執行緒
        t.join()
    print("Finish.")

無論是多執行緒還是多程序程式設計,這也是我一般會選擇 multiprocessing 的原因。

除了直接建立程序,還可以用程序池(或者 multiprocessing.dummy 裡的程序池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     建立 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

執行緒池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     建立 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

這裡示例有個問題,pool 在 join 前需要 close 掉,否則就會丟擲異常,不過 Python 之禪的作者 Tim Peters 給出解釋:

As to Pool.close(), you should call that when - and only when - you're never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It's also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there's often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you'd otherwise never see.

同步原語

在多程序程式設計中,因為程序間的資源隔離,不需要考慮記憶體的執行緒安全問題,而在多執行緒程式設計中便需要同步原語來儲存執行緒安全,因為 Python 是一門簡單的語言,很多操作都是封裝的作業系統 API,因此支援的同步原語蠻全,但這裡只寫兩種常見的同步原語:鎖和訊號量。

通過使用鎖可以用來保護一段記憶體空間,而訊號量可以被多個執行緒共享。

threading 中可以看到 Lock 鎖和 RLock 重用鎖兩種鎖,區別如名。這兩種鎖都只能被一個執行緒擁有,第一種鎖只能被獲得一次,而重用鎖可以被多次獲得,但也需要同樣次數的釋放才能真正的釋放。

當多個執行緒對同一塊記憶體空間同時進行修改的時候,經常遇到奇怪的問題:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非執行緒安全導致 count 沒有達到預期的效果。而通過鎖便可以控制某一段程式碼,或者說某段記憶體空間的訪問:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

當然,上述例子非常暴力,直接強行把併發改為序列。

對於訊號量常見於有限資源強佔的場景,可以定義固定大小的訊號量供多個執行緒獲取或者釋放,從而控制執行緒的任務執行,比如下面的例子,控制最多有 5 個任務在執行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")

Queue 和 Pipe

因為多程序的記憶體隔離,不會存在記憶體競爭的問題。但同時,多個程序間的資料共享成為了新的問題,而程序間通訊常見:佇列,管道,訊號。

這裡只講解佇列和管道。

佇列常見於雙程序模型,一般用作生產者-消費者模式,由生產者程序向佇列中釋出任務,並由消費者從佇列首部拿出任務進行執行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理論上每個程序都可以向佇列裡的讀或者寫,可以認為佇列是半雙工路線。但是往往只有特定的讀程序(比如消費者)和寫程序(比如生產者),儘管這些程序只是開發者自己定義的。

而 Pipe 更像一個全工路線:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介紹的 threadingmultiprocessing 兩個庫外,還有一個好用的令人髮指的庫 concurrent.futures。和前面兩個庫不同,這個庫是更高等級的抽象,隱藏了很多底層的東西,但也因此非常好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

該庫中自帶了程序池和執行緒池,可以通過上下文管理器來管理,而且對於非同步任務執行完後,結果的獲得也非常簡單。再拿一個官方的多程序計算的例子作為結束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

歡迎到微信裡去當吃瓜群眾