1. 程式人生 > 實用技巧 >21-Python併發程式設計之Futures

21-Python併發程式設計之Futures

區分併發和並行

在我們學習併發程式設計時,常常同時聽到併發(Concurrency)和並行(Parallelism)這兩個術語,這兩者經常一起使用,導致很多人以為它們是一個意思,其實不然。

首先你要辨別一個誤區,在Python中,併發並不是指同一時刻有多個操作(thread、task)同時進行。相反,某個特定的時刻,它只允許有一個操作發生,只不過執行緒/任務之間會互相切換,直到完成。我們來看下面這張圖:

圖中出現了thread和task兩種切換順序的不同方式,分別對應Python中併發的兩種形式——threading和asyncio。

對於threading,作業系統知道每個執行緒的所有資訊,因此它會做主在適當的時候做執行緒切換。很顯然,這樣的好處是程式碼容易書寫,因為程式設計師不需要做任何切換操作的處理;但是切換執行緒的操作,也有可能出現在一個語句執行的過程中(比如 x += 1),這樣就容易出現race condition的情況。

而對於asyncio,主程式想要切換任務時,必須得到此任務可以被切換的通知,這樣一來也就可以避免剛剛提到的 race condition的情況。

至於所謂的並行,指的才是同一時刻、同時發生。Python中的multi-processing便是這個意思,對於multi-processing,你可以簡單地這麼理解:比如你的電腦是6核處理器,那麼在執行程式時,就可以強制Python開6個程序,同時執行,以加快執行速度,它的原理示意圖如下:

對比來看,

  • 併發通常應用於I/O操作頻繁的場景,比如你要從網站上下載多個檔案,I/O操作的時間可能會比CPU執行處理的時間長得多。
  • 而並行則更多應用於CPU heavy的場景,比如MapReduce中的平行計算,為了加快執行速度,一般會用多臺機器、多個處理器來完成。

併發程式設計之Futures

單執行緒與多執行緒效能比較

接下來,我們一起通過具體的例項,從程式碼的角度來理解併發程式設計中的Futures,並進一步來比較其與單執行緒的效能區別。

假設我們有一個任務,是下載一些網站的內容並列印。如果用單執行緒的方式,它的程式碼實現如下所示(為了簡化程式碼,突出主題,此處我忽略了異常處理):

import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))
    
def download_all(sites):
    for site in sites:
        download_one(site)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    main()

# 輸出
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 93347 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 2.464231112999869 seconds

這種方式應該是最直接也最簡單的:

  • 先是遍歷儲存網站的列表;
  • 然後對當前網站執行下載操作;
  • 等到當前操作完成後,再對下一個網站進行同樣的操作,一直到結束。

我們可以看到總共耗時約2.4s。單執行緒的優點是簡單明瞭,但是明顯效率低下,因為上述程式的絕大多數時間,都浪費在了I/O等待上。程式每次對一個網站執行下載操作,都必須等到前一個網站下載完成後才能開始。如果放在實際生產環境中,我們需要下載的網站數量至少是以萬為單位的,不難想象,這種方案根本行不通。

接著我們再來看,多執行緒版本的程式碼實現:

import concurrent.futures
import requests
import threading
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))


def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

## 輸出
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 91533 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 0.19936635800002023 seconds

非常明顯,總耗時是0.2s左右,效率一下子提升了10倍多。

我們具體來看這段程式碼,它是多執行緒版本和單執行緒版的主要區別所在:

   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)

這裡我們建立了一個執行緒池,總共有5個執行緒可以分配使用。executer.map()與前面所講的Python內建的map()函式類似,表示對sites中的每一個元素,併發地呼叫函式download_one()。

順便提一下,在download_one()函式中,我們使用的requests.get()方法是執行緒安全的(thread-safe),因此在多執行緒的環境下,它也可以安全使用,並不會出現race condition的情況。

另外,雖然執行緒的數量可以自己定義,但是執行緒數並不是越多越好,因為執行緒的建立、維護和刪除也會有一定的開銷。所以如果你設定的很大,反而可能會導致速度變慢。我們往往需要根據實際的需求做一些測試,來尋找最優的執行緒數量。

當然,我們也可以用並行的方式去提高程式執行效率。你只需要在download_all()函式中,做出下面的變化即可:

with futures.ThreadPoolExecutor(workers) as executor
=>
with futures.ProcessPoolExecutor() as executor: 

在需要修改的這部分程式碼中,函式ProcessPoolExecutor()表示建立程序池,使用多個程序並行的執行程式。不過,這裡我們通常省略引數workers,因為系統會自動返回CPU的數量作為可以呼叫的程序數。

我剛剛提到過,並行的方式一般用在CPU heavy的場景中,因為對於I/O heavy的操作,多數時間都會用於等待,相比於多執行緒,使用多程序並不會提升效率。反而很多時候,因為CPU數量的限制,會導致其執行效率不如多執行緒版本。

到底什麼是 Futures ?

Python中的Futures模組,位於concurrent.futures和asyncio中,它們都表示帶有延遲的操作。Futures會將處於等待狀態的操作包裹起來放到佇列中,這些操作的狀態隨時可以查詢,當然,它們的結果或是異常,也能夠在操作完成後被獲取。

通常來說,作為使用者,我們不用考慮如何去建立Futures,這些Futures底層都會幫我們處理好。我們要做的,實際上是去schedule這些Futures的執行。

比如,Futures中的Executor類,當我們執行executor.submit(func)時,它便會安排裡面的func()函式執行,並返回建立好的future例項,以便你之後查詢呼叫。

這裡再介紹一些常用的函式。Futures中的方法done(),表示相對應的操作是否完成——True表示完成,False表示沒有完成。不過,要注意,done()是non-blocking的,會立即返回結果。相對應的add_done_callback(fn),則表示Futures完成後,相對應的引數函式fn,會被通知並執行呼叫。

Futures中還有一個重要的函式result(),它表示當future完成後,返回其對應的結果或異常。而as_completed(fs),則是針對給定的future迭代器fs,在其完成後,返回完成後的迭代器。

所以,上述例子也可以寫成下面的形式:

import concurrent.futures
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one, site)
            to_do.append(future)
            
        for future in concurrent.futures.as_completed(to_do):
            future.result()
def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

# 輸出
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 107634 from https://en.wikipedia.org/wiki/Portal:Biography
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 158984 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 157949 from https://en.wikipedia.org/wiki/Portal:Technology
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 94228 from https://en.wikipedia.org/wiki/Portal:Science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 468421 from https://en.wikipedia.org/wiki/PHP
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 0.21698231499976828 seconds

這裡,我們首先呼叫executor.submit(),將下載每一個網站的內容都放進future佇列to_do,等待執行。然後是as_completed()函式,在future完成後,便輸出結果。

不過,這裡要注意,future列表中每個future完成的順序,和它在列表中的順序並不一定完全一致。到底哪個先完成、哪個後完成,取決於系統的排程和每個future的執行時間。

為什麼多執行緒每次只能有一個執行緒執行?

前面我說過,同一時刻,Python主程式只允許有一個執行緒執行,所以Python的併發,是通過多執行緒的切換完成的。你可能會疑惑這到底是為什麼呢?

這裡我簡單提一下全域性直譯器鎖的概念,具體內容後面會講到。

事實上,Python的直譯器並不是執行緒安全的,為了解決由此帶來的race condition等問題,Python便引入了全域性直譯器鎖,也就是同一時刻,只允許一個執行緒執行。當然,在執行I/O操作時,如果一個執行緒被block了,全域性直譯器鎖便會被釋放,從而讓另一個執行緒能夠繼續執行。

總結

這節課,我們首先學習了Python中併發和並行的概念與區別。

  • 併發,通過執行緒和任務之間互相切換的方式實現,但同一時刻,只允許有一個執行緒或任務執行。
  • 而並行,則是指多個程序完全同步同時的執行。

併發通常用於I/O操作頻繁的場景,而並行則適用於CPU heavy的場景。

隨後,我們通過下載網站內容的例子,比較了單執行緒和運用Futures的多執行緒版本的效能差異。顯而易見,合理地運用多執行緒,能夠極大地提高程式執行效率。

我們還一起學習了Futures的具體原理,介紹了一些常用函式比如done()、result()、as_completed()等的用法,並輔以例項加以理解。

要注意,Python中之所以同一時刻只允許一個執行緒執行,其實是由於全域性直譯器鎖的存在。但是對I/O操作而言,當其被block的時候,全域性直譯器鎖便會被釋放,使其他執行緒繼續執行。

思考題

最後給你留一道思考題。你能否通過查閱相關文件,為今天所講的這個下載網站內容的例子,加上合理的異常處理,讓程式更加穩定健壯呢