Python協程相關整理
協程
筆記整理,記錄一下
asyncio
是Python 3.4版本後引入的標準庫,內建了對非同步IO的支援。 asyncio
的程式設計模型就是一個訊息迴圈。我們從asyncio
模組中直接獲取一個EventLoop
的引用,然後把需要執行的協程扔到EventLoop
中執行,就實現了非同步IO。
定義協程
import asyncio
@asyncio.coroutine
def aaa():
print("hello...")
print(asyncio.iscoroutinefunction(aaa)) # 判斷函式是否是協程
print(asyncio. iscoroutine(aaa())) # 判斷是否是協程
# 在3.5過後,我們可以使用async修飾將普通函式和生成器函式包裝成非同步函式和非同步生成器。
async def bbb():
print("hello2...")
print(asyncio.iscoroutinefunction(bbb)) # True
print(asyncio.iscoroutine(bbb())) # True
上面的協程只是列印了一句hello...
, 現在模擬協程去執行io操作
import asyncio
@asyncio.coroutine
def ccc():
print ("hello...")
asyncio.sleep(3) # 看成是一個耗時3秒的IO操作,主執行緒並未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現併發執行
一個協程可以:
* 等待一個 future 結束
* 等待另一個協程(產生一個結果,或引發一個異常)
* 產生一個結果給正在等它的協程
* 引發一個異常給正在等它的協程
注意:
asyncio.sleep
也是一個協程。 可參見asyncio.sleep
的文件
sleep(delay, result=None, *, loop=None) Coroutine that completes after a given time (in seconds).
執行協程
呼叫協程函式,協程並不會開始執行,只是返回一個協程物件。
協程的執行有兩種方式:
- 在另一個已經執行的協程中用
yield from
或await
它 - 通過
ensure_future
函式計劃它的執行
我們從asyncio
模組中直接獲取一個EventLoop
的引用,然後把需要執行的協程扔到EventLoop
中執行。簡單來說,只有 loop 運行了,協程才可能執行。下面先拿到當前執行緒的 loop
,然後把協程物件交給 loop.run_until_complete
。
loop = asyncio.get_event_loop()
loop.run_until_complete(aaa()) # 列印 hello...
注意:
run_until_complete
的引數是一個 future
,但是我們這裡傳給它的卻是協程物件,之所以能這樣,是因為它在內部做了檢查,通過 ensure_future
函式把協程物件包裝(wrap)成了 future 。下面是在這部分原始碼:
In [120]: loop.run_until_complete??
Signature: loop.run_until_complete(future)
Source:
def run_until_complete(self, future):
"""Run until the Future is done.
If the argument is a coroutine, it is wrapped in a Task.
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
Return the Future's result, or raise its exception.
"""
self._check_closed()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
我們也可以這樣寫:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(aaa())) # 列印 hello...
協程去執行另一個協程
@asyncio.coroutine
def aaa():
print("hello...")
yield from bbb()
# await bbb() # 同上, 但必須要這樣定義協程async def aaa(): pass 否則報語法錯誤
async def bbb(): # 同aaa
print("hello2...")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(aaa()))
# 列印結果
hello...
hello2...
[Finished in 0.5s]
關於執行順序:
import asyncio
async def aaa():
print("aaaa")
r = await bbb("send content")
print(r)
print("aaa end")
async def bbb(c):
print("bbbb")
print(c)
return 'response content'
def main():
print("start")
loop = asyncio.get_event_loop()
loop.run_until_complete(aaa())
print("End")
if __name__ == '__main__':
main()
# 列印結果
start
aaaa
bbbb
send content
response content
aaa end
End
[Finished in 0.5s]
添加回調
可以通過給future
添加回調方法,以便執行完Io操作, 希望得到通知
async def aaa():
print("hello...")
await bbb()
def cb(fu):
print("done")
print(fu)
loop = asyncio.get_event_loop()
fu = asyncio.ensure_future(aaa())
fu.add_done_callback(cb) # 添加回調方法
loop.run_until_complete(fu)
多個協程
async def aaa():
print("hello...")
await asyncio.sleep(3)
print("hello again...")
async def bbb():
print("hello2...")
await asyncio.sleep(3)
print("hello2 again...")
loop = asyncio.get_event_loop()
# 法一:
fu = asyncio.gather(aaa(), bbb())
# 法二:
tasks = [aaa(), bbb()]
fu = asyncio.gather(*tasks)
# 法三:
tasks = [asyncio.ensure_future(aaa()),
asyncio.ensure_future(bbb())]
fu = asyncio.gather(*tasks) # gather方法,把多個 futures 包裝成單個 future
loop.run_until_complete(fu) # 兩個協程是併發執行的
loop.close()
注意:
asyncio.wait()
也可以將多個協程聚合成一個future
。
具體差別可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。
參考文件:
https://segmentfault.com/a/1190000008814676
https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001432090954004980bd351f2cd4cc18c9e6c06d855c498000
https://zhuanlan.zhihu.com/p/27258289