1. 程式人生 > >python多執行緒 多程序

python多執行緒 多程序

多程序與多執行緒

我們都知道,作業系統中所有的程式都是以程序的方式來執行的,或者說我們把執行著的程式稱為程序(Process)。例如執行記事本程式就是啟動一個記事本程序,執行兩個記事本就是啟動兩個記事本程序。

很多時候,程序還不止同時幹一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,我們把程序內的這些“子任務”稱為執行緒(Thread)。由於每個程序至少要幹一件事,所以,一個程序至少有一個執行緒。

程序和執行緒的區別主要有:

  • 程序之間是相互獨立的,多程序中,同一個變數,各自有一份拷貝存在於每個程序中,且互不影響;而同一個程序的多個執行緒是記憶體共享的,所有變數都由所有執行緒共享。
  • 由於程序間是獨立的,因此一個程序的崩潰不會影響到其他程序;而執行緒是包含在程序之內的,執行緒的崩潰就會引發程序的崩潰,繼而導致同一程序內的其他執行緒也奔潰。

真正的多程序或者多執行緒都需要多核 CPU 才可能實現。現在雖然多核 CPU 已經非常普及,但是任務數量遠遠多於 CPU 的核心數量,所以,作業系統會自動地把程序或執行緒輪流排程到每個核心上執行。例如對於核心 a,程序 1 執行 0.01 秒,切換到程序 2,程序 2 執行 0.01 秒,再切換到程序 3……每個程序都是交替執行的,但是由於 CPU 的執行速度實在是太快了,我們感覺就像這些程序都在同時執行一樣。多執行緒的執行方式類似,每一個核心都在作業系統的排程下,在多個執行緒之間快速切換,讓每個執行緒都短暫地交替執行。

目前,要同時完成多個任務通常有以下幾種解決方案:

  • 一種是啟動多個程序,每個程序雖然只有一個執行緒,但多個程序可以一塊執行多個任務。
  • 還有一種方法是啟動一個程序,在一個程序內啟動多個執行緒,這樣,多個執行緒也可以一塊執行多個任務。
  • 當然還有第三種方法,就是啟動多個程序,每個程序再啟動多個執行緒,這樣同時執行的任務就更多了。但是這種模型很複雜,實際很少採用。

總結一下就是,多工的實現有3種方式:

  • 多程序模式;
  • 多執行緒模式;
  • 多程序+多執行緒模式。

同時執行多個任務通常各個任務之間並不是沒有關聯的,而是需要相互通訊和協調,有時,任務 1 必須暫停等待任務 2 完成後才能繼續執行,有時,任務 3 和任務 4 又不能同時執行。所以,多程序和多執行緒的程式的複雜度要遠遠高於單程序單執行緒的程式。

Python 多程序

對於 Unix/Linux 作業系統來說,系統本身就提供了一個 fork() 系統呼叫,可以非常方便地建立多程序。Python的os 模組封裝了常見的系統呼叫,其中就包括 fork

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

執行結果如下:

Process (1806) start...
I (1806) just created a child process (1809).
I am child process (1809) and my parent is 1806.

普通的函式呼叫,呼叫一次,返回一次,但是 fork() 呼叫一次,返回兩次,因為作業系統自動把當前程序(父程序)複製了一份(子程序),然後,分別在父程序和子程序內返回。子程序永遠返回 0,而父程序返回子程序的 ID。

multiprocessing 模組

由於 Windows 系統沒有 fork 呼叫,因而 Python 提供了一個跨平臺的多程序模組multiprocessing,模組中使用 Process 類來代表一個程序物件。下面的例子演示了啟動一個子程序並等待其結束:

from multiprocessing import Process
import os

# 子程序要執行的程式碼
defrun_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

執行結果如下:

Parent process 928.
Process will start.
Run child process test (929)...
Process end.

建立子程序時,只需要傳入一個執行函式和函式的引數(target 指定了程序要執行的函式,args 指定了引數)。建立好程序 Process 的例項後,使用 start() 方法啟動。join() 方法可以等待子程序結束後再繼續往下執行,通常用於程序間的同步。

程序池 Pool

如果要啟動大量的子程序,可以用程序池的方式批量建立子程序:

from multiprocessing import Pool
import os, time, random

deflong_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4) # 設定程序池大小
    for i in range(5):
        p.apply_async(long_time_task, args=(i,)) # 設定每個程序要執行的函式和引數
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

執行結果如下:

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

在上面的程式碼中,Pool 用於生成程序池,對 Pool 物件呼叫 apply_async() 方法可以使每個程序非同步執行任務,也就說不用等上一個任務執行完才執行下一個任務。對 Pool 物件呼叫join() 方法會等待所有子程序執行完畢,呼叫 join() 之前必須先呼叫 close() 以關閉程序池,確保沒有新的程序加入。

輸出的結果中,task 4 要等待前面某個 task 完成後才執行,這是因為我們把 Pool 的大小設定成了 4,因此,最多同時執行 4 個程序。如果改成 p = Pool(5),就可以同時跑5個程序。Pool 的預設大小是 CPU 的核數。

程序間通訊

Process 之間肯定是需要通訊的,作業系統提供了很多機制來實現程序間的通訊。Python 的 multiprocessing 模組包裝了底層的機制,提供了佇列(Queue)、管道(Pipes)等多種方式來交換資料。

我們以佇列(Queue)為例,在父程序中建立兩個子程序,一個往佇列裡寫資料,一個從佇列裡讀資料:

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的程式碼:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀資料程序執行的程式碼:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    q = Queue() # 父程序建立 Queue,並傳給各個子程序:
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start() # 啟動子程序 pw,寫入
    pr.start() # 啟動子程序 pr,讀取
    pw.join() # 等待pw結束
    pr.terminate() # pr程序裡是死迴圈,無法等待其結束,只能強行終止:

執行結果如下:

Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.Put B to queue...Get B from queue.Put C to queue...Get C from queue.

Python 多執行緒

多工可以由多程序完成,也可以由一個程序內的多執行緒完成。我們前面提到了程序是由若干執行緒組成的,一個程序至少有一個執行緒。由於執行緒是作業系統直接支援的執行單元,因此,高階語言通常都內建多執行緒的支援,Python 也不例外,並且,Python 的執行緒是真正的 Posix Thread,而不是模擬出來的執行緒。

Python 的標準庫提供了兩個模組:_thread 和 threading。其中,_thread 是低階模組,threading 是高階模組,對_thread 進行了封裝。絕大多數情況下,我們只需要使用 threading 這個高階模組。

啟動一個執行緒就是把一個函式傳入並建立 Thread 例項,然後呼叫 start() 開始執行:

import time, threading

# 新執行緒執行的程式碼:
defloop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

執行結果如下:

thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

由於任何程序預設就會啟動一個執行緒,我們把該執行緒稱為主執行緒,主執行緒又可以啟動新的執行緒。Python 的 threading 模組有個current_thread() 函式,它永遠返回當前執行緒的例項。主執行緒例項的名字叫 MainThread,子執行緒的名字在建立時指定。上例中,我們用 LoopThread 命名子執行緒,如果不起名字 Python 就自動給執行緒命名為 Thread-1,Thread-2……

Lock

多執行緒和多程序最大的不同在於:多程序中,同一個變數,各自有一份拷貝存在於每個程序中,互不影響;而多執行緒中,所有變數由所有執行緒共享。因為任何一個執行緒都可以修改任何一個變數,所以執行緒之間共享資料最大的危險在於多個執行緒同時改一個變數,把內容給改亂了。

來看看多個執行緒同時操作一個變數怎麼把內容給改亂了:

import time, threading

balance = 0 # 假定這是你的銀行存款:

# 先存後取,結果應該為0
defchange_it(n):
    global balance
    balance = balance + n
    balance = balance - n

defrun_thread(n):
    for i in range(100000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

我們定義了一個共享變數 balance,初始值為 0,並且啟動兩個執行緒,先存後取,理論上結果應該為 0。但是,由於執行緒的排程是由作業系統決定的,當 t1、t2 交替執行時,只要迴圈次數足夠多,balance 的結果就不一定是 0 了。

原因是因為高階語言的一條語句在 CPU 執行時是若干條語句。例如一個簡單的計算 balance = balance + n,也分兩步:

  • 計算 balance + n,存入臨時變數中;
  • 將臨時變數的值賦給 balance。

如果作業系統以下面的順序執行 t1、t2:

初始值 balance = 0

t1: x1 = balance + 5  # x1 = 0 + 5 = 5

t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8

t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0

t2: x2 = balance - 8  # x2 = 0 - 8 = -8
t2: balance = x2   # balance = -8

結果 balance = -8

究其原因,是因為修改 balance 需要多條語句,而執行這幾條語句時,執行緒可能中斷,從而導致多個執行緒把同一個物件的內容改亂了。

如果我們要確保 balance 計算正確,就要給 change_it() 上一把鎖。當某個執行緒開始執行change_it() 時,由於該執行緒獲得了鎖,因此其他執行緒不能同時執行 change_it(),只能等待,直到鎖被釋放,這樣就可以避免修改的衝突。建立一個鎖可以通過 threading.Lock() 來實現:

balance = 0
lock = threading.Lock()

defrun_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()

當多個執行緒同時執行 lock.acquire() 時,只有一個執行緒能成功地獲取鎖,然後繼續執行程式碼,其他執行緒就繼續等待直到獲得鎖為止。獲得鎖的執行緒用完後一定要釋放鎖,否則那些苦苦等待鎖的執行緒將永遠等待下去,成為死執行緒,所以我們用try...finally 來確保鎖一定會被釋放。

鎖的好處就是確保了某段關鍵程式碼只能由一個執行緒從頭到尾完整地執行,壞處當然也很多。首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行。其次,由於可以存在多個鎖,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止。

GIL 鎖

Python 的執行緒雖然是真正的執行緒,但直譯器執行程式碼時,有一個 GIL 鎖(Global Interpreter Lock),任何 Python 執行緒執行前,必須先獲得 GIL 鎖。每執行 100 條位元組碼,直譯器就自動釋放 GIL 鎖,讓別的執行緒有機會執行。這個 GIL 全域性鎖實際上把所有執行緒的執行程式碼都給上了鎖,所以,多執行緒在 Python 中只能交替執行,即使 100 個執行緒跑在 100 核 CPU 上,也只能用到 1 個核。

GIL 是 Python 直譯器設計的歷史遺留問題,通常我們用的直譯器是官方實現的 CPython,要真正利用多核,除非重寫一個不帶 GIL 的直譯器。所以,在 Python 如果一定要通過多執行緒利用多核,那隻能通過 C 擴充套件來實現。

因而,多執行緒的併發在 Python 中就是一個美麗的夢,如果想真正實現多核任務,還是通過多程序來實現吧。