1. 程式人生 > >用 Python 3 的 async / await 做非同步程式設計

用 Python 3 的 async / await 做非同步程式設計

在接著寫下去之前,我先列舉一些 PEPs 以供參考:

從這些 PEPs 中可以看出 Python 生成器 / 協程的發展歷程:先是 PEP 255 引入了簡單的生成器;接著 PEP 342 賦予了生成器 send() 方法,使其可以傳遞資料,協程也就有了實際意義;接下來,PEP 380 增加了 yield from 語法,簡化了呼叫子生成器的語法;然後,PEP 492 將協程和生成器區分開,使得其更不易被用錯;最後,PEP 525 提供了非同步生成器,使得編寫非同步的資料產生器得到簡化。

本文將簡單介紹一下這些 PEPs,著重深入的則是 PEP 492。

首先提一下生成器(generator)。

Generator function 是函式體裡包含 yield 表示式的函式,它在呼叫時生成一個 generator 物件(以下將其命名為 gen)。第一次呼叫 next(gen)gen.send(None) 時,將進入它的函式體:在執行到 yield 表示式時,向呼叫者返回資料;當函式返回時,丟擲 StopIteration 異常。在該函式未執行完之前,可再次呼叫 next(gen) 進入函式體,也可呼叫 gen.send(value) 向其傳遞引數,以供其使用(例如提供必要的資料,或者控制其行為等)。

由於它主要的作用是產生一系列資料,所以一般使用 for … in gen 的語法來遍歷它,以簡化 next()

的呼叫和手動捕捉 StopIteration 異常。
即:

Python
123456 whileTrue:try:value=next(gen)process(value)exceptStopIteration:break

可以簡化為:

Python
12 forvalue ingen:process(value)

由於生成器提供了再次進入一個函式體的機制,其實它已經可以當成協程來使用了。
寫個很簡單的例子:

Python
1234567891011121314151617181920212223242526 importselectimportsocketdefcoroutine():sock=socket.socket()sock.setblocking(0)address=yieldsocktry:sock.connect(address)exceptBlockingIOError:passdata=yieldsize=yieldsock.send(data)yieldsock.recv(size)defmain():coro=coroutine()sock=coro.send(None)wait_list=(sock.fileno(),)coro.send(('www.baidu.com',80))select.select((),wait_list,())coro.send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')select.select(wait_list,(),())print(coro.send(1024))

這裡的 coroutine 函式用於處理連線和收發資料,而 main 函式則等待讀寫和傳遞引數。雖然看上去和同步的呼叫沒啥區別,但其實在 main 函式中可以同時執行多個 coroutine,以實現併發執行。

再說一下 yield from

如果一個生成器內部需要遍歷另一個生成器,並將資料返回給呼叫者,你需要遍歷它並處理所遇到的異常;而用了 yield from 後,則可以一行程式碼解決這些問題。具體例子就不列出了,PEP 380 裡有詳細的程式碼。

這對於協程而言也是一個利好,這使得它的呼叫也得到了簡化:

Python
12345 defcoroutine():...defcaller():yieldfromcoroutine()

接下來就該輪到協程(coroutine)登場了。

從上文也可看出,呼叫 yield from gen 時,我無法判定我是遍歷了一個生成器,還是呼叫了一個協程,這種混淆使得介面的設計者和使用者需要花費額外的工夫來約定和檢查。

於是 Python 又先後添加了 asyncio.coroutinetypes.coroutine 這兩個裝飾器來標註協程,這樣就使得需要使用協程時,不至於誤用了生成器。順帶一提,前者是 asyncio 庫的實現,需要保持向下相容,本文暫不討論;後者則是 Python 3.5 的語言實現,實際上是給函式的 __code__.co_flags 設定 CO_ITERABLE_COROUTINE 標誌。隨後,async def 也被引入以用於定義協程,它則是設定 CO_COROUTINE 標誌。

至此,協程和生成器就得以區分,其中以 types.coroutine 定義的協程稱為基於生成器的協程(generator-based coroutine),而以 async def 定義的協程則稱為原生協程(native coroutine)。

這兩種協程之間的區別其實並不大,非要追究的話,主要有這些:

  • 原生協程裡不能有 yieldyield from 表示式。
  • 原生協程被垃圾回收時,如果它從來沒被使用過(即呼叫 await corocoro.send(None)),會丟擲 RuntimeWarning
  • 原生協程沒有實現 __iter____next__ 方法。
  • 簡單的生成器(非協程)不能 yield from 原生協程
  • 對原生協程及其函式分別呼叫 inspect.isgenerator()inspect.isgeneratorfunction() 將返回 False

實際使用時,如果不考慮向下相容,可以都用原生協程,除非這個協程裡用到了 yieldyield from 表示式。

定義了協程函式以後,就可以呼叫它們了。
PEP 492 也引入了一個 await 表示式來呼叫協程,它的用法和 yield from 差不多,但是它只能在協程函式內部使用,且只能接 awaitable 的物件。
所謂 awaitable 的物件,就是其 __await__ 方法返回一個迭代器的物件。原生協程和基於生成器的協程都是 awaitable 的物件。
另一種呼叫協程的方法則和生成器一樣,呼叫其 send 方法,並自行迭代。這種方式主要用於在非協程函式裡呼叫協程。

舉例來說,呼叫的程式碼會類似這樣:

Python
123456789 @types.coroutinedefgenerator_coroutine():yield1async defnative_coroutine():await generator_coroutine()defmain():native_coroutine().send(None)

其中 generator_coroutine 函式裡因為用到了 yield 表示式,所以只能定義成基於生成器的協程;native_coroutine 函式由於自身是協程,可以直接用 await 表示式呼叫其他協程;main 函式由於不是協程,因而需要用 native_coroutine().send(None) 這種方式來呼叫協程。

這個例子其實也解釋了 V2EX 裡提到的那個問題,即為什麼原生協程不能「真正的暫停執行並強制性返回給事件迴圈」。

假設事件迴圈在 main 函式裡,原生協程是 native_coroutine 函式,那要怎麼才能讓它暫停並返回 main 函式呢?

很顯然 await generator_coroutine() 是不行的,這會進入 generator_coroutine 的函式體,而不是回到 main 函式;如果 yield 一個值,又會遇到之前提到的一個限制,即原生協程裡不能有 yield 表示式;最後僅剩 returnraise 這兩種選擇了,但它們雖然能回到 main 函式,卻也不是「暫停」,因為再也沒法「繼續」了。

所以一般而言,如果要用 Python 3.5 來做非同步程式設計的話,最外層的事件迴圈需要呼叫協程的 send 方法,裡面大部分的非同步方法都可以用原生協程來實現,但最底層的非同步方法則需要用基於生成器的協程。

為了有個更直觀的認識,再來舉個例子,抓取 10 個百度搜索的頁面:

Python
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 fromselectors importDefaultSelector,EVENT_READ,EVENT_WRITEimportsocketfromtypesimportcoroutinefromurllib.parse importurlparse@coroutinedefuntil_readable(fileobj):yieldfileobj,EVENT_READ@coroutinedefuntil_writable(fileobj):yieldfileobj,EVENT_WRITEasync defconnect(sock,address):try:sock.connect(address)exceptBlockingIOError:await until_writable(sock)async defrecv(fileobj):result=b''whileTrue:try:data=fileobj.recv(4096)ifnotdata:returnresultresult+=dataexceptBlockingIOError:await until_readable(fileobj)async defsend(fileobj,data):whiledata:try:sent_bytes=fileobj.send(data)data=data[sent_bytes:]exceptBlockingIOError:await until_writable(fileobj)async deffetch_url(url):parsed_url=urlparse(url)ifparsed_url.port isNone:port=443ifparsed_url.scheme=='https'else80else:port=parsed_url.portwithsocket.socket()assock:sock.setblocking(0)await connect(sock,(parsed_url.hostname,port))path=parsed_url.path ifparsed_url.path else'/'path_with_query='{}?{}'.format(path,parsed_url.query)ifparsed_url.query elsepathawait send(sock,'GET {} HTTP/1.1\r\nHost: {}\r\nConnection: Close\r\n\r\n'.format(path_with_query,parsed_url.netloc).encode())content=await recv(sock)print('{}: {}'.format(url,content))defmain():urls=['http://www.baidu.com/s?wd={}'.format(i)foriinrange(10)]tasks=[fetch_url(url)forurl inurls]# 將任務定義成協程物件withDefaultSelector()asselector:whiletasks orselector.get_map():# 有要做的任務,或者有等待的 IO 事件events=selector.select(0iftasks else1)# 如果有要做的任務,立刻獲得當前已就緒的 IO 事件,否則最多等待 1 秒forkey,event inevents:task=key.datatasks.append(task)# IO 事件已就緒,可以執行新 task 了selector.unregister(key.fileobj)# 取消註冊,避免重複執行fortask intasks:try:fileobj,event=task.send(None)# 開始或繼續執行 taskexceptStopIteration:passelse:selector.register(fileobj,event,task)# task 還未執行完,需要等待 IO,將 task 註冊為 key.datatasks.clear()main()

其他的函式都沒什麼好說的,主要解釋下 until_readableuntil_writablemain 函式。
其實 until_readableuntil_writable 函式都是 yield 一個 (fileobj, event) 元組,用於告知事件迴圈,這個 fileobjevent 事件需要被監聽。

而在 main 函式中,事件迴圈遍歷並執行 tasks 裡包含的協程。這些協程在等待 IO 時返回事件迴圈,由事件迴圈註冊事件及其對應的協程。到下一個事件迴圈時,取出所有就緒的事件,繼續執行其對應的協程,就完成了整個的非同步執行過程。

如果關注到 fetch_url 函式,就會發現業務邏輯用到的程式碼其實挺簡單,只是 await 非同步函式而已。這雖然簡化了大部分的開發工作,但其實也限制了它的表達能力,因為在一個協程內,不能同時 await 多個非同步函式——它實際上是順序執行的,只是不同協程之間可以非同步執行而已。

考慮一個 HTTP/2 的客戶端,它和伺服器之間的連線是多路複用的,也就是可以在一個連線裡同時發出和接收多份資料,而這些資料的傳輸是亂序的。如果一份 JavaScript 資源已經下載完畢,沒必要再等其他的圖片資源下載完畢才能執行。要做到這點,就需要協程有併發執行多個子協程,共同完成任務的能力。這在使用多執行緒或回撥函式時是很容易做到的,但使用 await 就顯得捉襟見肘了。倒也不是不能做,只是需要拿之前的程式碼改下,yield 一些子協程,並在事件迴圈中判斷一下型別就行了。

雖然僅用上述提到的東西,已經能做非同步程式設計了,但我還是得補充 2 個漏掉的語法知識:

1.async with
先考慮普通的 with 語句,它的主要作用是在進入和退出一個區域時,做一些初始化和清理工作。
例如:

Python
12345 f=open(