1. 程式人生 > >Python 的非同步 IO:Asyncio 簡介(一)

Python 的非同步 IO:Asyncio 簡介(一)

Python 的 asyncio 類似於 C++ 的 Boost.Asio。

所謂「非同步 IO」,就是你發起一個 IO 操作,卻不用等它結束,你可以繼續做其他事情,當它結束時,你會得到通知。

Asyncio 是併發(concurrency)的一種方式。對 Python 來說,併發還可以通過執行緒(threading)和多程序(multiprocessing)來實現。

Asyncio 並不能帶來真正的並行(parallelism)。當然,因為 GIL(全域性直譯器鎖)的存在,Python 的多執行緒也不能帶來真正的並行。

可交給 asyncio 執行的任務,稱為協程(coroutine)。一個協程可以放棄執行,把機會讓給其它協程(即 yield from

await)。

定義協程

協程的定義,需要使用 async def 語句。

1 async def do_some_work(x):pass

do_some_work 便是一個協程。
準確來說,do_some_work 是一個協程函式,可以通過 asyncio.iscoroutinefunction 來驗證:

1 print(asyncio.iscoroutinefunction(do_some_work))# True

這個協程什麼都沒做,我們讓它睡眠幾秒,以模擬實際的工作量 :

123 async def do_some_work(x):print("Waiting "+str(x))await asyncio.sleep(x)

在解釋 await 之前,有必要說明一下協程可以做哪些事。協程可以:

12345 *等待一個future結束*等待另一個協程(產生一個結果,或引發一個異常)*產生一個結果給正在等它的協程*引發一個異常給正在等它的協程

asyncio.sleep 也是一個協程,所以 await asyncio.sleep(x) 就是等待另一個協程。可參見 asyncio.sleep 的文件:

12 sleep(delay,result=None,*,loop=None)Coroutine that completes afteragiven time(inseconds).

執行協程

呼叫協程函式,協程並不會開始執行,只是返回一個協程物件,可以通過 asyncio.iscoroutine 來驗證:

1 print(asyncio.iscoroutine(do_some_work(3)))# True

此處還會引發一條警告:

12 async1.py:16:RuntimeWarning:coroutine'do_some_work'was never awaitedprint(asyncio.iscoroutine(do_some_work(3)))

要讓這個協程物件執行的話,有兩種方式:

123 *在另一個已經執行的協程中用`await`等待它*通過`ensure_future`函式計劃它的執行

簡單來說,只有 loop 運行了,協程才可能執行。
下面先拿到當前執行緒預設的 loop ,然後把協程物件交給 loop.run_until_complete,協程物件隨後會在 loop 裡得到執行。

12 loop=asyncio.get_event_loop()loop.run_until_complete(do_some_work(3))

run_until_complete 是一個阻塞(blocking)呼叫,直到協程執行結束,它才返回。這一點從函式名不難看出。
run_until_complete 的引數是一個 future,但是我們這裡傳給它的卻是協程物件,之所以能這樣,是因為它在內部做了檢查,通過 ensure_future 函式把協程物件包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:

1 loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

完整程式碼:

12345678 import asyncioasync def do_some_work(x):print("Waiting "+str(x))await asyncio.sleep(x)loop=asyncio.get_event_loop()loop.run_until_complete(do_some_work(3))

執行結果:

Python
12 Waiting3<三秒鐘後程序結束>

回撥

假如協程是一個 IO 的讀操作,等它讀完資料後,我們希望得到通知,以便下一步資料的處理。這一需求可以通過往 future 添加回調來實現。

1234567 def done_callback(futu):print('Done')futu=asyncio.ensure_future(do_some_work(3))futu.add_done_callback(done_callback)loop.run_until_complete(futu)

多個協程

實際專案中,往往有多個協程,同時在一個 loop 裡執行。為了把多個協程交給 loop,需要藉助 asyncio.gather 函式。

1 loop.run_until_complete(asyncio.gather(do_some_work(1),do_some_work(3)))

或者先把協程存在列表裡:

12 coros=[do_some_work(1),do_some_work(3)]loop.run_until_complete(asyncio.gather(*coros))

執行結果:

Python
1234 Waiting3Waiting1<等待三秒鐘>Done

這兩個協程是併發執行的,所以等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程為準。

參考函式 gather 的文件:

gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutines or futures.

發現也可以傳 futures 給它:

1234 futus=[asyncio.ensure_future(do_some_work(1)),asyncio.ensure_future(do_some_work(3))]loop.run_until_complete(asyncio.gather(*futus))

gather 起聚合的作用,把多個 futures 包裝成單個 future,因為 loop.run_until_complete 只接受單個 future。

run_until_complete 和 run_forever

我們一直通過 run_until_complete 來執行 loop ,等到 future 完成,run_until_complete 也就返回了。

123456789 async def do_some_work(x):print('Waiting '+str(x))await asyncio.sleep(x)print('Done')loop=asyncio.get_event_loop()coro=do_some_work(3)loop.run_until_complete(coro)

輸出:

Python
1234 Waiting3<等待三秒鐘>Done<程式退出>

現在改用 run_forever

1234567891011 async def do_some_work(x):print('Waiting '+str(x))await asyncio.sleep(x)print('Done')loop=asyncio.get_event_loop()coro=do_some_work(3)asyncio.ensure_future(coro)loop.run_forever()

輸出:

Python
1234 Waiting3<等待三秒鐘>Done<程式沒有退出>

三秒鐘過後,future 結束,但是程式並不會退出。run_forever 會一直執行,直到 stop 被呼叫,但是你不能像下面這樣調 stop

12 loop.run_forever()loop.stop()

run_forever 不返回,stop 永遠也不會被呼叫。所以,只能在協程中調 stop

12345 async def do_some_work(loop,x):print('Waiting '+str(x))await asyncio.sleep(x)print('Done')loop.stop()

這樣並非沒有問題,假如有多個協程在 loop 裡執行:

1234 asyncio.ensure_future(do_some_work(loop,1))asyncio.ensure_future(do_some_work(loop,3))loop.run_forever()

第二個協程沒結束,loop 就停止了——被先結束的那個協程給停掉的。
要解決這個問題,可以用 gather 把多個協程合併成一個 future,並添加回調,然後在回撥裡再去停止 loop。