1. 程式人生 > >python 程序執行緒

python 程序執行緒

1.多程序
要讓Python程式實現多程序(multiprocessing),我們先了解作業系統的相關知識。
Unix/Linux作業系統提供了一個fork()系統呼叫,它非常特殊。普通的函式呼叫,呼叫一次,返回一次,但是fork()呼叫一次,返回兩次,因為作業系統自動把當前程序(稱為父程序)複製了一份(稱為子程序),然後,分別在父程序和子程序內返回。
子程序永遠返回0,而父程序返回子程序的ID。這樣做的理由是,一個父程序可以fork出很多子程序,所以,父程序要記下每個子程序的ID,而子程序只需要呼叫getppid()就可以拿到父程序的ID。
Python的os模組封裝了常見的系統呼叫,其中就包括fork,可以在Python程式中輕鬆建立子程序:
import os


print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
執行結果如下:


Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由於Windows沒有fork呼叫,上面的程式碼在Windows上無法執行。由於Mac系統是基於BSD(Unix的一種)核心,所以,在Mac下執行是沒有問題的,推薦大家用Mac學Python!
有了fork呼叫,一個程序在接到新任務時就可以複製出一個子程序來處理新任務,常見的Apache伺服器就是由父程序監聽埠,每當有新的http請求時,就fork出子程序來處理新的http請求。
multiprocessing
如果你打算編寫多程序的服務程式,Unix/Linux無疑是正確的選擇。由於Windows沒有fork呼叫,難道在Windows上無法用Python編寫多程序的程式?
由於Python是跨平臺的,自然也應該提供一個跨平臺的多程序支援。multiprocessing模組就是跨平臺版本的多程序模組。
multiprocessing模組提供了一個Process類來代表一個程序物件,下面的例子演示了啟動一個子程序並等待其結束:
from multiprocessing import Process
import os


# 子程序要執行的程式碼
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))


if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')
執行結果如下:


Parent process 928.
Process will start.
Run child process test (929)...
Process end.
建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,用start()方法啟動,這樣建立程序比fork()還要簡單。


join()方法可以等待子程序結束後再繼續往下執行,通常用於程序間的同步。


Pool
如果要啟動大量的子程序,可以用程序池的方式批量建立子程序:
from multiprocessing import Pool
import os, time, random


def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))


if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
執行結果如下:


Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
程式碼解讀:


對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續新增新的Process了。


請注意輸出的結果,task 0,1,2,3是立刻執行的,而task 4要等待前面某個task完成後才執行,這是因為Pool的預設大小在我的電腦上是4,因此,最多同時執行4個程序。這是Pool有意設計的限制,並不是作業系統的限制。如果改成:
p = Pool(5)
就可以同時跑5個程序。
由於Pool的預設大小是CPU的核數,如果你不幸擁有8核CPU,你要提交至少9個子程序才能看到上面的等待效果。
子程序
很多時候,子程序並不是自身,而是一個外部程序。我們建立了子程序後,還需要控制子程序的輸入和輸出。


subprocess模組可以讓我們非常方便地啟動一個子程序,然後控制其輸入和輸出。


下面的例子演示瞭如何在Python程式碼中執行命令nslookup www.python.org,這和命令列直接執行的效果是一樣的:


import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
執行結果:


$ nslookup www.python.org
Server:        192.168.19.4
Address:    192.168.19.4#53


Non-authoritative answer:
www.python.org    canonical name = python.map.fastly.net.
Name:    python.map.fastly.net
Address: 199.27.79.223


Exit code: 0
如果子程序還需要輸入,則可以通過communicate()方法輸入:


import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
上面的程式碼相當於在命令列執行命令nslookup,然後手動輸入:


set q=mx
python.org
exit
執行結果如下:


$ nslookup
Server:        192.168.19.4
Address:    192.168.19.4#53


Non-authoritative answer:
python.org    mail exchanger = 50 mail.python.org.


Authoritative answers can be found from:
mail.python.org    internet address = 82.94.164.166
mail.python.org    has AAAA address 2001:888:2000:d::a6




Exit code: 0
程序間通訊
Process之間肯定是需要通訊的,作業系統提供了很多機制來實現程序間的通訊。Python的multiprocessing模組包裝了底層的機制,提供了Queue、Pipes等多種方式來交換資料。


我們以Queue為例,在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料:


from multiprocessing import Process, Queue
import os, time, random


# 寫資料程序執行的程式碼:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 讀資料程序執行的程式碼:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)


if __name__=='__main__':
    # 父程序建立Queue,並傳給各個子程序:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子程序pw,寫入:
    pw.start()
    # 啟動子程序pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr程序裡是死迴圈,無法等待其結束,只能強行終止:
    pr.terminate()
執行結果如下:


Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
在Unix/Linux下,multiprocessing模組封裝了fork()呼叫,使我們不需要關注fork()的細節。由於Windows沒有fork呼叫,因此,multiprocessing需要“模擬”出fork的效果,父程序所有Python物件都必須通過pickle序列化再傳到子程序去,所有,如果multiprocessing在Windows下呼叫失敗了,要先考慮是不是pickle失敗了。
在Unix/Linux下,可以使用fork()呼叫實現多程序。
要實現跨平臺的多程序,可以使用multiprocessing模組。
程序間通訊是通過Queue、Pipes等實現的。


2.多執行緒
多工可以由多程序完成,也可以由一個程序內的多執行緒完成。
我們前面提到了程序是由若干執行緒組成的,一個程序至少有一個執行緒。
由於執行緒是作業系統直接支援的執行單元,因此,高階語言通常都內建多執行緒的支援,Python也不例外,並且,Python的執行緒是真正的Posix Thread,而不是模擬出來的執行緒。
Python的標準庫提供了兩個模組:_thread和threading,_thread是低階模組,threading是高階模組,對_thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高階模組。
啟動一個執行緒就是把一個函式傳入並建立Thread例項,然後呼叫start()開始執行:


import time, threading


# 新執行緒執行的程式碼:
def loop():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)


print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
執行結果如下:


thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
由於任何程序預設就會啟動一個執行緒,我們把該執行緒稱為主執行緒,主執行緒又可以啟動新的執行緒,Python的threading模組有個current_thread()函式,它永遠返回當前執行緒的例項。主執行緒例項的名字叫MainThread,子執行緒的名字在建立時指定,我們用LoopThread命名子執行緒。名字僅僅在列印時用來顯示,完全沒有其他意義,如果不起名字Python就自動給執行緒命名為Thread-1,Thread-2……


Lock
多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自有一份拷貝存在於每個程序中,互不影響,而多執行緒中,所有變數都由所有執行緒共享,所以,任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享資料最大的危險在於多個執行緒同時改一個變數,把內容給改亂了。


來看看多個執行緒同時操作一個變數怎麼把內容給改亂了:


import time, threading


# 假定這是你的銀行存款:
balance = 0


def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        change_it(n)


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
我們定義了一個共享變數balance,初始值為0,並且啟動兩個執行緒,先存後取,理論上結果應該為0,但是,由於執行緒的排程是由作業系統決定的,當t1、t2交替執行時,只要迴圈次數足夠多,balance的結果就不一定是0了。


原因是因為高階語言的一條語句在CPU執行時是若干條語句,即使一個簡單的計算:


balance = balance + n
也分兩步:


計算balance + n,存入臨時變數中;
將臨時變數的值賦給balance。
也就是可以看成:


x = balance + n
balance = x
由於x是區域性變數,兩個執行緒各自都有自己的x,當代碼正常執行時:


初始值 balance = 0


t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1     # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1     # balance = 0


t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2     # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2     # balance = 0


結果 balance = 0
但是t1和t2是交替執行的,如果作業系統以下面的順序執行t1、t2:


初始值 balance = 0


t1: x1 = balance + 5  # x1 = 0 + 5 = 5


t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8


t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0


t2: x2 = balance - 8  # x2 = 0 - 8 = -8
t2: balance = x2   # balance = -8


結果 balance = -8
究其原因,是因為修改balance需要多條語句,而執行這幾條語句時,執行緒可能中斷,從而導致多個執行緒把同一個物件的內容改亂了。


兩個執行緒同時一存一取,就可能導致餘額不對,你肯定不希望你的銀行存款莫名其妙地變成了負數,所以,我們必須確保一個執行緒在修改balance的時候,別的執行緒一定不能改。


如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個執行緒開始執行change_it()時,我們說,該執行緒因為獲得了鎖,因此其他執行緒不能同時執行change_it(),只能等待,直到鎖被釋放後,獲得該鎖以後才能改。由於鎖只有一個,無論多少執行緒,同一時刻最多隻有一個執行緒持有該鎖,所以,不會造成修改的衝突。建立一個鎖就是通過threading.Lock()來實現:


balance = 0
lock = threading.Lock()


def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()
當多個執行緒同時執行lock.acquire()時,只有一個執行緒能成功地獲取鎖,然後繼續執行程式碼,其他執行緒就繼續等待直到獲得鎖為止。


獲得鎖的執行緒用完後一定要釋放鎖,否則那些苦苦等待鎖的執行緒將永遠等待下去,成為死執行緒。所以我們用try...finally來確保鎖一定會被釋放。


鎖的好處就是確保了某段關鍵程式碼只能由一個執行緒從頭到尾完整地執行,壞處當然也很多,首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行,效率就大大地下降了。其次,由於可以存在多個鎖,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止。


多核CPU
如果你不幸擁有一個多核CPU,你肯定在想,多核應該可以同時執行多個執行緒。


如果寫一個死迴圈的話,會出現什麼情況呢?


開啟Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監控某個程序的CPU使用率。


我們可以監控到一個死迴圈執行緒會100%佔用一個CPU。


如果有兩個死迴圈執行緒,在多核CPU中,可以監控到會佔用200%的CPU,也就是佔用兩個CPU核心。


要想把N核CPU的核心全部跑滿,就必須啟動N個死迴圈執行緒。


試試用Python寫個死迴圈:


import threading, multiprocessing


def loop():
    x = 0
    while True:
        x = x ^ 1


for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()
啟動與CPU核心數量相同的N個執行緒,在4核CPU上可以監控到CPU佔用率僅有102%,也就是僅使用了一核。


但是用C、C++或Java來改寫相同的死迴圈,直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什麼Python不行呢?


因為Python的執行緒雖然是真正的執行緒,但直譯器執行程式碼時,有一個GIL鎖:Global Interpreter Lock,任何Python執行緒執行前,必須先獲得GIL鎖,然後,每執行100條位元組碼,直譯器就自動釋放GIL鎖,讓別的執行緒有機會執行。這個GIL全域性鎖實際上把所有執行緒的執行程式碼都給上了鎖,所以,多執行緒在Python中只能交替執行,即使100個執行緒跑在100核CPU上,也只能用到1個核。


GIL是Python直譯器設計的歷史遺留問題,通常我們用的直譯器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的直譯器。


所以,在Python中,可以使用多執行緒,但不要指望能有效利用多核。如果一定要通過多執行緒利用多核,那隻能通過C擴充套件來實現,不過這樣就失去了Python簡單易用的特點。


不過,也不用過於擔心,Python雖然不能利用多執行緒實現多核任務,但可以通過多程序實現多核任務。多個Python程序有各自獨立的GIL鎖,互不影響。
多執行緒程式設計,模型複雜,容易發生衝突,必須用鎖加以隔離,同時,又要小心死鎖的發生。
Python直譯器由於設計時有GIL全域性鎖,導致了多執行緒無法利用多核。多執行緒的併發在Python中就是一個美麗的夢。


3.ThreadLocal
在多執行緒環境下,每個執行緒都有自己的資料。一個執行緒使用自己的區域性變數比使用全域性變數好,因為區域性變數只有執行緒自己能看見,不會影響其他執行緒,而全域性變數的修改必須加鎖。
但是區域性變數也有問題,就是在函式呼叫的時候,傳遞起來很麻煩:
def process_student(name):
    std = Student(name)
    # std是區域性變數,但是每個函式都要用它,因此必須傳進去:
    do_task_1(std)
    do_task_2(std)


def do_task_1(std):
    do_subtask_1(std)
    do_subtask_2(std)


def do_task_2(std):
    do_subtask_2(std)
    do_subtask_2(std)
每個函式一層一層呼叫都這麼傳引數那還得了?用全域性變數?也不行,因為每個執行緒處理不同的Student物件,不能共享。


如果用一個全域性dict存放所有的Student物件,然後以thread自身作為key獲得執行緒對應的Student物件如何?


global_dict = {}


def std_thread(name):
    std = Student(name)
    # 把std放到全域性變數global_dict中:
    global_dict[threading.current_thread()] = std
    do_task_1()
    do_task_2()


def do_task_1():
    # 不傳入std,而是根據當前執行緒查詢:
    std = global_dict[threading.current_thread()]
    ...


def do_task_2():
    # 任何函式都可以查找出當前執行緒的std變數:
    std = global_dict[threading.current_thread()]
    ...
這種方式理論上是可行的,它最大的優點是消除了std物件在每層函式中的傳遞問題,但是,每個函式獲取std的程式碼有點醜。


有沒有更簡單的方式?


ThreadLocal應運而生,不用查詢dict,ThreadLocal幫你自動做這件事:


import threading


# 建立全域性ThreadLocal物件:
local_school = threading.local()


def process_student():
    # 獲取當前執行緒關聯的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))


def process_thread(name):
    # 繫結ThreadLocal的student:
    local_school.student = name
    process_student()


t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
執行結果:


Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
全域性變數local_school就是一個ThreadLocal物件,每個Thread對它都可以讀寫student屬性,但互不影響。你可以把local_school看成全域性變數,但每個屬性如local_school.student都是執行緒的區域性變數,可以任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。


可以理解為全域性變數local_school是一個dict,不但可以用local_school.student,還可以繫結其他變數,如local_school.teacher等等。


ThreadLocal最常用的地方就是為每個執行緒繫結一個數據庫連線,HTTP請求,使用者身份資訊等,這樣一個執行緒的所有呼叫到的處理函式都可以非常方便地訪問這些資源。
一個ThreadLocal變數雖然是全域性變數,但每個執行緒都只能讀寫自己執行緒的獨立副本,互不干擾。ThreadLocal解決了引數在一個執行緒中各個函式之間互相傳遞的問題。




程序 vs. 執行緒
閱讀: 84740
我們介紹了多程序和多執行緒,這是實現多工最常用的兩種方式。現在,我們來討論一下這兩種方式的優缺點。


首先,要實現多工,通常我們會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,因此,多工環境下,通常是一個Master,多個Worker。


如果用多程序實現Master-Worker,主程序就是Master,其他程序就是Worker。


如果用多執行緒實現Master-Worker,主執行緒就是Master,其他執行緒就是Worker。


多程序模式最大的優點就是穩定性高,因為一個子程序崩潰了,不會影響主程序和其他子程序。(當然主程序掛了所有程序就全掛了,但是Master程序只負責分配任務,掛掉的概率低)著名的Apache最早就是採用多程序模式。


多程序模式的缺點是建立程序的代價大,在Unix/Linux系統下,用fork呼叫還行,在Windows下建立程序開銷巨大。另外,作業系統能同時執行的程序數也是有限的,在記憶體和CPU的限制下,如果有幾千個程序同時執行,作業系統連排程都會成問題。


多執行緒模式通常比多程序快一點,但是也快不到哪去,而且,多執行緒模式致命的缺點就是任何一個執行緒掛掉都可能直接造成整個程序崩潰,因為所有執行緒共享程序的記憶體。在Windows上,如果一個執行緒執行的程式碼出了問題,你經常可以看到這樣的提示:“該程式執行了非法操作,即將關閉”,其實往往是某個執行緒出了問題,但是作業系統會強制結束整個程序。


在Windows下,多執行緒的效率比多程序要高,所以微軟的IIS伺服器預設採用多執行緒模式。由於多執行緒存在穩定性的問題,IIS的穩定性就不如Apache。為了緩解這個問題,IIS和Apache現在又有多程序+多執行緒的混合模式,真是把問題越搞越複雜。


執行緒切換
無論是多程序還是多執行緒,只要數量一多,效率肯定上不去,為什麼呢?


我們打個比方,假設你不幸正在準備中考,每天晚上需要做語文、數學、英語、物理、化學這5科的作業,每項作業耗時1小時。


如果你先花1小時做語文作業,做完了,再花1小時做數學作業,這樣,依次全部做完,一共花5小時,這種方式稱為單任務模型,或者批處理任務模型。


假設你打算切換到多工模型,可以先做1分鐘語文,再切換到數學作業,做1分鐘,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多工是一樣的了,以幼兒園小朋友的眼光來看,你就正在同時寫5科作業。


但是,切換作業是有代價的,比如從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫儲存現場),然後,開啟數學課本、找出圓規直尺(這叫準備新環境),才能開始做數學作業。作業系統在切換程序或者執行緒時也是一樣的,它需要先儲存當前執行的現場環境(CPU暫存器狀態、記憶體頁等),然後,把新任務的執行環境準備好(恢復上次的暫存器狀態,切換記憶體頁等),才能開始執行。這個切換過程雖然很快,但是也需要耗費時間。如果有幾千個任務同時進行,作業系統可能就主要忙著切換任務,根本沒有多少時間去執行任務了,這種情況最常見的就是硬碟狂響,點視窗無反應,系統處於假死狀態。


所以,多工一旦多到一個限度,就會消耗掉系統所有的資源,結果效率急劇下降,所有任務都做不好。


計算密集型 vs. IO密集型
是否採用多工的第二個考慮是任務的型別。我們可以把任務分為計算密集型和IO密集型。


計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視訊進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多工完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。


計算密集型任務由於主要消耗CPU資源,因此,程式碼執行效率至關重要。Python這樣的指令碼語言執行效率很低,完全不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。


第二種任務的型別是IO密集型,涉及到網路、磁碟IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和記憶體的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。


IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少,因此,用執行速度極快的C語言替換用Python這樣執行速度極低的指令碼語言,完全無法提升執行效率。對於IO密集型任務,最合適的語言就是開發效率最高(程式碼量最少)的語言,指令碼語言是首選,C語言最差。


非同步IO
考慮到CPU和IO之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待IO操作,單程序單執行緒模型會導致別的任務無法並行執行,因此,我們才需要多程序模型或者多執行緒模型來支援多工併發執行。


現代作業系統對IO操作已經做了巨大的改進,最大的特點就是支援非同步IO。如果充分利用作業系統提供的非同步IO支援,就可以用單程序單執行緒模型來執行多工,這種全新的模型稱為事件驅動模型,Nginx就是支援非同步IO的Web伺服器,它在單核CPU上採用單程序模型就可以高效地支援多工。在多核CPU上,可以執行多個程序(數量與CPU核心數相同),充分利用多核CPU。由於系統總的程序數量十分有限,因此作業系統排程非常高效。用非同步IO程式設計模型來實現多工是一個主要的趨勢。


對應到Python語言,單執行緒的非同步程式設計模型稱為協程,有了協程的支援,就可以基於事件驅動編寫高效的多工程式。我們會在後面討論如何編寫協程。




分散式程序
閱讀: 93527
在Thread和Process中,應當優選Process,因為Process更穩定,而且,Process可以分佈到多臺機器上,而Thread最多隻能分佈到同一臺機器的多個CPU上。


Python的multiprocessing模組不但支援多程序,其中managers子模組還支援把多程序分佈到多臺機器上。一個服務程序可以作為排程者,將任務分佈到其他多個程序中,依靠網路通訊。由於managers模組封裝很好,不必瞭解網路通訊的細節,就可以很容易地編寫分散式多程序程式。


舉個例子:如果我們已經有一個通過Queue通訊的多程序程式在同一臺機器上執行,現在,由於處理任務的程序任務繁重,希望把傳送任務的程序和處理任務的程序分佈到兩臺機器上。怎麼用分散式程序實現?


原有的Queue可以繼續使用,但是,通過managers模組把Queue通過網路暴露出去,就可以讓其他機器的程序訪問Queue了。


我們先看服務程序,服務程序負責啟動Queue,把Queue註冊到網路上,然後往Queue裡面寫入任務:


# task_master.py


import random, time, queue
from multiprocessing.managers import BaseManager


# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()


# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass


# 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 繫結埠5000, 設定驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啟動Queue:
manager.start()
# 獲得通過網路訪問的Queue物件:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 從result佇列讀取結果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 關閉:
manager.shutdown()
print('master exit.')
請注意,當我們在一臺機器上寫多程序程式時,建立的Queue可以直接拿來用,但是,在分散式多程序環境下,新增任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue介面新增。


然後,在另一臺機器上啟動任務程序(本機上啟動也可以):


# task_worker.py


import time, sys, queue
from multiprocessing.managers import BaseManager


# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass


# 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')


# 連線到伺服器,也就是執行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 埠和驗證碼注意保持與task_master.py設定的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網路連線:
m.connect()
# 獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task佇列取任務,並把結果寫入result佇列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')
任務程序要通過網路連線到服務程序,所以要指定服務程序的IP。


現在,可以試試分散式程序的工作效果了。先啟動task_master.py服務程序:


$ python3 task_master.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
task_master.py程序傳送完任務後,開始等待result佇列的結果。現在啟動task_worker.py程序:


$ python3 task_worker.py
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.
task_worker.py程序結束,在task_master.py程序中會繼續打印出結果:


Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956
這個簡單的Master/Worker模型有什麼用?其實這就是一個簡單但真正的分散式計算,把程式碼稍加改造,啟動多個worker,就可以把任務分佈到幾臺甚至幾十臺機器上,比如把計算n*n的程式碼換成傳送郵件,就實現了郵件佇列的非同步傳送。


Queue物件儲存在哪?注意到task_worker.py中根本沒有建立Queue的程式碼,所以,Queue物件儲存在task_master.py程序中:


                                             │
┌─────────────────────────────────────────┐     ┌──────────────────────────────────────┐
│task_master.py                           │  │  │task_worker.py                        │
│                                         │     │                                      │
│  task = manager.get_task_queue()        │  │  │  task = manager.get_task_queue()     │
│  result = manager.get_result_queue()    │     │  result = manager.get_result_queue() │
│              │                          │  │  │              │                       │
│              │                          │     │              │                       │
│              ▼                          │  │  │              │                       │
│  ┌─────────────────────────────────┐    │     │              │                       │
│  │QueueManager                     │    │  │  │              │                       │
│  │ ┌────────────┐ ┌──────────────┐ │    │     │              │                       │
│  │ │ task_queue │ │ result_queue │ │<───┼──┼──┼──────────────┘                       │
│  │ └────────────┘ └──────────────┘ │    │     │                                      │
│  └─────────────────────────────────┘    │  │  │                                      │
└─────────────────────────────────────────┘     └──────────────────────────────────────┘
                                             │


                                          Network
而Queue之所以能通過網路訪問,就是通過QueueManager實現的。由於QueueManager管理的不止一個Queue,所以,要給每個Queue的網路呼叫介面起個名字,比如get_task_queue。


authkey有什麼用?這是為了保證兩臺機器正常通訊,不被其他機器惡意干擾。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定連線不上。


小結
Python的分散式程序介面簡單,封裝良好,適合需要把繁重任務分佈到多臺機器的環境下。


注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述資料量要儘量小。比如傳送一個處理日誌檔案的任務,就不要傳送幾百兆的日誌檔案本身,而是傳送日誌檔案存放的完整路徑,由Worker程序再去共享的磁碟上讀取檔案。