Python 的非同步 IO:Asyncio 簡介(一)
Python 的 asyncio 類似於 C++ 的 Boost.Asio。
所謂「非同步 IO」,就是你發起一個 IO 操作,卻不用等它結束,你可以繼續做其他事情,當它結束時,你會得到通知。
Asyncio 是併發(concurrency)的一種方式。對 Python 來說,併發還可以通過執行緒(threading)和多程序(multiprocessing)來實現。
Asyncio 並不能帶來真正的並行(parallelism)。當然,因為 GIL(全域性直譯器鎖)的存在,Python 的多執行緒也不能帶來真正的並行。
可交給 asyncio 執行的任務,稱為協程(coroutine)。一個協程可以放棄執行,把機會讓給其它協程(即 yield from
await
)。
定義協程
協程的定義,需要使用 async def
語句。
1 | async def do_some_work(x):pass |
do_some_work
便是一個協程。
準確來說,do_some_work
是一個協程函式,可以通過 asyncio.iscoroutinefunction
來驗證:
1 | print(asyncio.iscoroutinefunction(do_some_work))# True |
這個協程什麼都沒做,我們讓它睡眠幾秒,以模擬實際的工作量 :
123 | async def do_some_work(x):print("Waiting "+str(x))await asyncio.sleep(x) |
在解釋 await
之前,有必要說明一下協程可以做哪些事。協程可以:
12345 | *等待一個future結束*等待另一個協程(產生一個結果,或引發一個異常)*產生一個結果給正在等它的協程*引發一個異常給正在等它的協程 |
asyncio.sleep
也是一個協程,所以 await asyncio.sleep(x)
就是等待另一個協程。可參見 asyncio.sleep
的文件:
12 | sleep(delay,result=None,*,loop=None)Coroutine that completes afteragiven time(inseconds). |
執行協程
呼叫協程函式,協程並不會開始執行,只是返回一個協程物件,可以通過 asyncio.iscoroutine
來驗證:
1 | print(asyncio.iscoroutine(do_some_work(3)))# True |
此處還會引發一條警告:
12 | async1.py:16:RuntimeWarning:coroutine'do_some_work'was never awaitedprint(asyncio.iscoroutine(do_some_work(3))) |
要讓這個協程物件執行的話,有兩種方式:
123 | *在另一個已經執行的協程中用`await`等待它*通過`ensure_future`函式計劃它的執行 |
簡單來說,只有 loop 運行了,協程才可能執行。
下面先拿到當前執行緒預設的 loop ,然後把協程物件交給 loop.run_until_complete
,協程物件隨後會在 loop 裡得到執行。
12 | loop=asyncio.get_event_loop()loop.run_until_complete(do_some_work(3)) |
run_until_complete
是一個阻塞(blocking)呼叫,直到協程執行結束,它才返回。這一點從函式名不難看出。
run_until_complete
的引數是一個 future,但是我們這裡傳給它的卻是協程物件,之所以能這樣,是因為它在內部做了檢查,通過 ensure_future
函式把協程物件包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:
1 | loop.run_until_complete(asyncio.ensure_future(do_some_work(3))) |
完整程式碼:
12345678 | import asyncioasync def do_some_work(x):print("Waiting "+str(x))await asyncio.sleep(x)loop=asyncio.get_event_loop()loop.run_until_complete(do_some_work(3)) |
執行結果:
Python12 | Waiting3<三秒鐘後程序結束> |
回撥
假如協程是一個 IO 的讀操作,等它讀完資料後,我們希望得到通知,以便下一步資料的處理。這一需求可以通過往 future 添加回調來實現。
1234567 | def done_callback(futu):print('Done')futu=asyncio.ensure_future(do_some_work(3))futu.add_done_callback(done_callback)loop.run_until_complete(futu) |
多個協程
實際專案中,往往有多個協程,同時在一個 loop 裡執行。為了把多個協程交給 loop,需要藉助 asyncio.gather
函式。
1 | loop.run_until_complete(asyncio.gather(do_some_work(1),do_some_work(3))) |
或者先把協程存在列表裡:
12 | coros=[do_some_work(1),do_some_work(3)]loop.run_until_complete(asyncio.gather(*coros)) |
執行結果:
Python1234 | Waiting3Waiting1<等待三秒鐘>Done |
這兩個協程是併發執行的,所以等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程為準。
參考函式 gather
的文件:
gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutines or futures.
發現也可以傳 futures 給它:
1234 | futus=[asyncio.ensure_future(do_some_work(1)),asyncio.ensure_future(do_some_work(3))]loop.run_until_complete(asyncio.gather(*futus)) |
gather
起聚合的作用,把多個 futures 包裝成單個 future,因為 loop.run_until_complete
只接受單個 future。
run_until_complete 和 run_forever
我們一直通過 run_until_complete
來執行 loop ,等到 future 完成,run_until_complete
也就返回了。
123456789 | async def do_some_work(x):print('Waiting '+str(x))await asyncio.sleep(x)print('Done')loop=asyncio.get_event_loop()coro=do_some_work(3)loop.run_until_complete(coro) |
輸出:
Python1234 | Waiting3<等待三秒鐘>Done<程式退出> |
現在改用 run_forever
:
1234567891011 | async def do_some_work(x):print('Waiting '+str(x))await asyncio.sleep(x)print('Done')loop=asyncio.get_event_loop()coro=do_some_work(3)asyncio.ensure_future(coro)loop.run_forever() |
輸出:
Python1234 | Waiting3<等待三秒鐘>Done<程式沒有退出> |
三秒鐘過後,future 結束,但是程式並不會退出。run_forever
會一直執行,直到 stop
被呼叫,但是你不能像下面這樣調 stop
:
12 | loop.run_forever()loop.stop() |
run_forever
不返回,stop
永遠也不會被呼叫。所以,只能在協程中調 stop
:
12345 | async def do_some_work(loop,x):print('Waiting '+str(x))await asyncio.sleep(x)print('Done')loop.stop() |
這樣並非沒有問題,假如有多個協程在 loop 裡執行:
1234 | asyncio.ensure_future(do_some_work(loop,1))asyncio.ensure_future(do_some_work(loop,3))loop.run_forever() |
第二個協程沒結束,loop 就停止了——被先結束的那個協程給停掉的。
要解決這個問題,可以用 gather
把多個協程合併成一個 future,並添加回調,然後在回撥裡再去停止 loop。