asyncio系列之抽絲剝繭分析事件排程的核心原理
先來看一下一個簡單的例子
例1:
async def foo():
print('enter foo ...')
await bar()
print('exit foo ...')
async def bar():
print('enter bar ...')
print('exit bar ...')
f = foo()
try:
f.send(None)
except StopIteration as e:
print(e.value)
例2:
async def foo(): print('enter foo ...') try: bar().send(None) except StopIteration as e: pass print('exit foo ...') async def bar(): print('enter bar ...') print('exit bar ...') f = foo() try: f.send(None) except StopIteration as e: print(e.value)
也就是說 await bar()
等價於這個
try:
bar().send(None)
except StopIteration as e:
pass
更進一步來講,await 協程的巢狀就跟函式呼叫一樣,沒什麼兩樣。
def foo():
print('enter foo ...')
bar()
print('exit foo ...')
def bar():
print('enter bar ...')
print('exit bar ...')
foo()
理解了跟函式呼叫一樣就可以看看成是這樣:
執行f.send(None)時其實就是執行
print('enter foo ...')
print('enter bar ...')
print('exit bar ...')
print('exit foo ...')
例3:
class Future: def __iter__(self): print('enter Future ...') yield self print('foo 恢復執行') print('exit Future ...') __await__ = __iter__ async def foo(): print('enter foo ...') await bar() print('exit foo ...') async def bar(): future = Future() print('enter bar ...') await future print('exit bar ...') f = foo() try: f.send(None) print('foo 掛起在yield處 ') print('--'*10) f.send(None) except StopIteration as e: print(e.value)
執行結果:
enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處
--------------------
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...
None
Future是一個Awaitable物件,實現了__await__方法,await future 實際上是會進入到future.__await__方法中也就是future.__iter__方法中的邏輯,執行到 yield self 處foo協程才真正被掛起,返回future物件本身,f.send(None)才真正的執行完畢,
第一次呼叫f.send(None),執行:
print('enter foo ...') print('enter bar ...') print('enter Future ...')
被掛起
第二次呼叫f.send(None),執行:
print('exit Future ...') print('exit bar ...') print('exit foo ...')
也就是說這樣一個foo協程完整的呼叫過程就是如下過程:
- foo print('enter foo ...')
- bar print('enter bar ...')
- future print('enter Future ...') # 以上是第一次f.send(None)執行的邏輯,命名為part1
- future yield self ---------------------------------------------------------------
- print('exit Future ...') # 一下是第二次f.send(None)執行的邏輯,命名為part2
- bar print('exit bar ...')
- foo print('exit foo ...')
加入我們把這兩次f.send(None)呼叫的邏輯分別命名成part1和part2,那也就是說,通過future這個物件,準確的說是yield關鍵字,真正的把foo協程要執行的完整邏輯分成了兩部分part1和patr2。並且foo的協程狀態會被掛起在yield處,這樣就要呼叫兩次f.send(None)才能,執行完foo協程,而不是在例2中,直接只調用一次f.send(None)就執行完了foo協程。這就是Future物件的作用。
這裡小結一下Future的作用
yield起到了掛起協程的作用。
通過yield把foo協程的執行邏輯真正的分成了part1和part2兩部分。
例4:
class Future:
def __iter__(self):
print('enter Future ...')
print('foo 掛起在yield處 ')
yield self
print('foo 恢復執行')
print('enter Future ...')
return 'future'
__await__ = __iter__
class Task:
def __init__(self, cor):
self.cor = cor
def _step(self):
cor = self.cor
try:
result = cor.send(None)
except Exception as e:
pass
async def foo():
print('enter foo ...')
await bar()
print('exit foo ...')
async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...')
f = foo()
task = Task(f)
task._step()
print('--' * 10)
task._step()
執行結果:
enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處
--------------------
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...
這個例子與例3不同在於,現在有一個Task類,我們把f.send(None)d的操作,封裝在了Task的_step方法中,呼叫task._step()等於是執行part1中的邏輯,再次呼叫task._step()等於是執行part2中的邏輯。現在不想手動的task._step()這樣,在看下面的例子
例5:
class Future:
def __iter__(self):
print('enter Future ...')
print('foo 掛起在yield處 ')
yield self
print('foo 恢復執行')
print('enter Future ...')
return 'future'
__await__ = __iter__
class Task:
def __init__(self, cor, *, loop=None):
self.cor = cor
self._loop = loop
def _step(self):
cor = self.cor
try:
result = cor.send(None)
except StopIteration as e:
self._loop.close()
except Exception as e:
pass
class Loop:
def __init__(self):
self._stop = False
def create_task(self, cor):
task = Task(cor, loop = self)
return task
def run_until_complete(self, task):
while not self._stop:
task._step()
def close(self):
self._stop = True
async def foo():
print('enter foo ...')
await bar()
print('exit foo ...')
async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...')
if __name__ == '__main__':
f = foo()
loop = Loop()
task = loop.create_task(f)
loop.run_until_complete(task)
執行結果:
enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...
例4中我們實現了一個簡單Loop類,在while迴圈中呼叫task._step方法。
例5:
class Future:
def __init__(self, *, loop=None):
self._result = None
self._callbacks = []
def set_result(self, result):
self._result = result
callbacks = self._callbacks[:]
self._callbacks = []
for callback in callbacks:
loop._ready.append(callback)
def add_callback(self, callback):
self._callbacks.append(callback)
def __iter__(self):
print('enter Future ...')
print('foo 掛起在yield處 ')
yield self
print('foo 恢復執行')
print('enter Future ...')
return 'future'
__await__ = __iter__
class Task:
def __init__(self, cor, *, loop=None):
self.cor = cor
self._loop = loop
def _step(self):
cor = self.cor
try:
result = cor.send(None)
# 1. cor 協程執行完畢時,會丟擲StopIteration,說明cor執行完畢了,這是關閉loop
except StopIteration as e:
self._loop.close()
# 2. 有異常時
except Exception as e:
"""處理異常邏輯"""
# 3. result為Future物件時
else:
if isinstance(result, Future):
result.add_callback(self._wakeup)
# 立即呼叫,讓下一loop輪迴圈中立馬執行self._wakeup
result.set_result(None)
def _wakeup(self):
self._step()
class Loop:
def __init__(self):
self._stop = False
self._ready = []
def create_task(self, cor):
task = Task(cor, loop = self)
self._ready.append(task._step)
return task
def run_until_complete(self, task):
assert isinstance(task, Task)
while not self._stop:
n = len(self._ready)
for i in range(n):
step = self._ready.pop()
step()
def close(self):
self._stop = True
async def foo():
print('enter foo ...')
await bar()
print('exit foo ...')
async def bar():
future = Future(loop=loop)
print('enter bar ...')
await future
print('exit bar ...')
if __name__ == '__main__':
f = foo()
loop = Loop()
task = loop.create_task(f)
loop.run_until_complete(task)
執行結果:
enter foo ...
enter bar ...
enter Future ...
foo 掛起在yield處
foo 恢復執行
exit Future ...
exit bar ...
exit foo ...
到此為止,我們構建了三個稍微複雜點的Loop類,Task, Future類,這3個類在整個協程執行流程的排程過程中有很強的相互作用關係。
Future
掛起協程的執行流程,把協程的邏輯分為part1和part2兩部分。
Task
把協程的part1和part2邏輯封裝到task._step和task._wakeup方法中,在不同的時機分別把它們註冊到loop物件中,task._step是建立task例項的時候就註冊到了loop中,task._wakeup則是在task._setp執行完掛在yield future處,由於有await future語句的存在,必然是返回一個future物件,判斷確實是一個future物件,就把task._wakeup註冊到future中,future.set_result()則會在合適的時機被呼叫,一旦它被呼叫,就會把future中註冊的task._wakeup註冊到loop中,然後就會在loop迴圈中呼叫task._wakeup,協程的part2的邏輯才得以執行,最後丟擲StopIteration異常。
Loop
在一個死迴圈中執行註冊到loop中的task._step和task._wakeup方法,完成對協程完整邏輯的執行。
雖然我們自己構建的這三個類的實現很簡單,但是這體現asyncio實現事件迴圈的核心原理,我們實現loop中並沒有模擬耗時等待以及對真正IO事件的監聽,對應於asyncio來說,它也是構建了Future, Task, Loop這3個類,只是功能要比我們自己構建的要複雜得多,loop物件的while中通過select(timeout)函式的呼叫實現模擬耗時操作和實現了對網路IO事件的監聽,這樣我們只要在寫了一個執行一個IO操作時,都會有一個future物件 await future,通過future來掛起當前的協程,比如想進行一個socket連線,協程的虛擬碼如下:
future = Future
# 非阻塞呼叫,需要try...except...
socket.connect((host, port))
# 註冊一個回撥函式到write_callbackselect中,只要socket發生可寫事件,就執行回撥
add_writer(write_callback, future)
await future
...
當我們在呼叫socket.connect((host, port)),因為是非阻塞socket,會立馬返回,然後把這個write_callback, future註冊成select的可寫事件的回撥函式,這個回撥函式什麼時候被執行呢,就是在loop迴圈的select(timeout)返回了可寫事件時才會觸發,回撥函式中會呼叫future.set_result(),也就是說future.set_result的觸發時機是在socket連線成功時,select(timeout)返回了可寫事件時,future.set_result的作用就是把協程的part2部分註冊到loop,然後在下一輪的迴圈中立即呼叫,使得協程的await future下面的語句得以繼續執行。
由於我這裡沒有貼asyncio的loop,task,future物件的原始碼,所以這個例子看起來會很抽象,在上一篇asyncio中貼了這幾個類的原始碼,想詳細瞭解的可以檢視我的上一篇文章《asyncio系列之簡單協程的基本執行流程分析》。小夥伴們也可以對照著asyncio的原始碼來debug,這樣再來理解這裡說的這個例子就比較容易了。
下一篇將介紹asyncio.sleep()的實現機制