深入Asyncio(十一)優雅地開始與結束
Startup and Shutdown Graceful
大部分基於asyncio的程序都是需要長期運行、基於網絡的應用,處理這種應用的正確開啟與關閉存在驚人的復雜性。
開啟相對來說更簡單點,常規做法是創建一個task,然後調用loop.run_forever(),就如第三章QuickStart中的例子一樣。
一個例外是當啟動監聽服務器時需要經過兩個階段:
- 為服務器的啟動創建一個coroutine,然後調用
run_until_complete()
來初始化並啟動服務器本身; - 通過調用
loop.run_forever()
來調用main函數。
通常啟動是很簡單的,碰到上述例外情況,查看官方示例。
關閉就要復雜得多,之前講過run_forever()
調用會阻塞主線程,當執行關閉時,會解除阻塞並執行後續代碼,此時就需要:
- 收集所有尚未完成的task對象;
- 將他們聚集到一個group任務中;
- 取消group任務(需要捕捉CancelledError);
- 通過
run_until_complete()
來等待執行完畢。
在這之後關閉才算完成,初學者在寫異步代碼時總是極力擺脫的一些錯誤信息比如task還未等待就被關閉了,主要原因就是遺失了上述步驟中的一個或多個,用個例子來說明。
import asyncio async def f(delay): await asyncio.sleep(delay) loop = asyncio.get_event_loop() t1 = loop.create_task(f(1)) # 任務1執行1秒 t2 = loop.create_task(f(2)) # 任務2執行2秒 loop.run_until_complete(t1) # 只有任務1被執行完成 loop.close()
λ python3 taskwaring.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() running at taskwaring.py:4> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0312D6D0>()]>>
這個錯誤是說有些任務在loop關閉時還沒完成,這也就是為什麽規範的關閉過程要將所有的task收集到一個task中,取消它們然後在loop關閉之前等待取消完成。
再多看些比QuickStart代碼更細節的例子,這次用官方文檔中的echo服務器代碼作為服務器,通過客戶端代碼來深入學習。
from asyncio import (
get_event_loop,
start_server,
CancelledError,
StreamReader,
StreamWriter,
Task,
gather
)
async def echo(reader: StreamReader, writer: StreamWriter): # 1
print(‘New connection.‘)
try:
while True: # 2
data: bytes = await reader.readlines() # 3
if data in [b‘‘, b‘quit‘]:
break
writer.write(data.upper()) # 4
await writer.drain()
print(‘Leaving Connection.‘)
except CancelledError: # 5
writer.write_eof()
print(‘Cancelled‘)
finally:
writer.close()
loop = get_event_loop()
coro = start_server(echo, ‘127.0.0.1‘, 8888, loop=loop) # 6
server = loop.run_until_complete(coro) # 7
try:
loop.run_forever() # 8
except KeyboardInterrupt:
print(‘Shutting Down!‘)
server.close() # 9
loop.run_until_complete(server.wait_closed()) # 10
tasks = Task.all_tasks() # 11
group = gather(*tasks, return_exceptions=True) # 12
group.cancel()
loop.run_until_complete(group) # 13
loop.close()
這個協程用於為每個建立的連接創建一個協程,使用了Stream的API;
為了保持連接,用死循環獲取消息;
從服務器獲取信息;
將消息的字符全部大寫返回;
此處處理退出,進行環境退出的清理工作;
這裏是程序開始的地方,服務器需要單獨循行,start_server方法返回一個corountine,必須在run_until_complete中執行;
運行coroutine來啟動TCP服務器;
現在才開始程序的監聽部分,為連接到服務器的每個TCP生成一個coroutine來執行echo例程函數,唯一能打斷loop的只能是KeyboardInterrupt異常;
程序運行到這裏的話,關閉操作已經開始,從現在開始要讓服務器停止接受新的連接,第一步是調用server.close();
第二步是調用server.wait_closed()來關閉那些仍在等待連接建立的socket,仍處於活躍狀態的連接不會受影響;
開始關閉task,先收集當前所有等待狀態的task;
將task聚集到一個group中,然後調用cancel方法,此處的return_exceptions參數後面講;
運行group這個協程。
要註意的一點是,如果在一個coroutine內部捕捉了一個CancelledError,要註意在異常捕捉代碼中不要創建任何coroutine,all_tasks()
無法感知在run_until_complete()
運行階段創建的任何新任務。
return_exceptions=True
參數是幹什麽的?
gather()
方法有個默認參數是return_exceptions=False,通過默認設置來關閉異常處理是有問題的,很難直接解釋清楚,可以通過一系列事實來說明:
1. run_until_complete()
方法執行Future對象,在關閉期間,執行由gather()
方法返回的Future對象;
2. 如果這個Future對象拋出了一個異常,那麽這個異常會繼續向上拋出,導致loop停止;
3. 如果run_until_complete()
被用來執行一個group Future對象,任何group內子任務未處理而拋出的異常都會被向上拋出,也包含CancelledError;
4. 如果一部分子任務處理了CancelledError異常,另一部分未處理,則未處理的那部分的異常也會導致loop停止,這意味著loop在所有tasks完成前就停止了;
5. 在關閉loop時,不希望上述特性被觸發,只是想要所有在group中的task盡快執行結束,也不理會某些task是否拋出異常;
6. 使用gather(*, return_exceptions=True)
可以讓group將子任務中的異常當作返回值處理,因此不會影響run_until_complete()
的執行。
關於捕獲異常不合人意的一點就是某些異常在group內被處理了而沒有被拋出,這對通過結果查找異常、寫logging造成了困難。
import asyncio
async def f(delay):
await asyncio.sleep(1/delay) # 傳入值是0就很惡心了
return delay
loop = asyncio.get_event_loop()
for i in range(10):
loop.create_task(f(i))
pending = asyncio.Task.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f‘Results: {results}‘)
loop.close()
不設置參數的話就會導致異常被向上拋出,然後loop停止並導致其他task無法完成。安全退出是網絡編程最難的問題之一,這對asyncio也是一樣的。
Signals
在上一個例子中演示了如何通過KeyboardInterrupt
來退出loop,這個異常有效地結束了run_forever()
的阻塞,並允許後續代碼得以執行。
KeyboardInterrupt
異常等同於SIGINT
信號,在網絡服務中最常用的停止信號其實是SIGTERM
,並且也是在UNIX shell環境中使用kill
指令發出的默認信號。
在UNIX系統中kill
指令其實就是發送信號給進程,不加參數地調用就會發送TERM
信號使進程安全退出或被忽視掉,通常這不是個好辦法,因為如果進程沒有退出,kill
就會發送KILL信號來強制退出,這會導致你的程序無法可控地結束。
asyncio原生支持處理進程信號,但處理一般信號的復雜度太高(不是針對asyncio),本文不會深入講解,只會挑一些常見信號來舉例。先看下例:
# shell_signal01.py
import asyncio
async def main(): # 這裏是應用的主體部分,簡單的用一個死循環來表示程序運行
while True:
print(‘<Your app is running>‘)
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main()) # 這裏與前幾個例子一樣,將coroutine添加到loop中
try:
loop.run_forever()
except KeyboardInterrupt: # 在本例中,只有Ctrl-C會終止loop,然後像前例中進行善後工作
print(‘<Got signal: SIGINT, shutting down.>‘)
tasks = asyncio.Task.all_tasks()
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
loop.run_until_complete(group)
loop.close()
這些很簡單,下面思考一些復雜的功能:
1. 產品需要將SIGINT和SIGTERM都當作停止信號;
2. 需要在應用的main()
中處理CancelledError
,並且處理異常的代碼也需要一小段時間來運行(例如有一堆網絡連接需要關閉);
3. 應用多次接收停止信號不會出現異常,在接收到一次停止信號後,後續的信號都不作處理。
asyncio提供了足夠粒度的API來處理這些場景。
# shell_signal02.py
import asyncio
from signal import SIGINT, SIGTERM # 從標準庫中導入信號值
async def main():
try:
while True:
print(‘<Your app is running>‘)
await asyncio.sleep(1)
except asyncio.CancelledError: # 1
for i in range(3):
print(‘<Your app is shtting down...>‘)
await asyncio.sleep(1)
def handler(sig): # 2
loop.stop() # 3
print(f‘Got signal: {sig}, shtting down.‘)
loop.remove_signal_handler(SIGTERM) # 4
loop.add_signal_handler(SIGINT, lambda: None) # 5
if __name__ == "__main__":
loop = asyncio.get_event_loop()
for sig in (SIGINT, SIGTERM): # 6
loop.add_signal_handler(sig, handler, sig)
loop.create_task(main())
loop.run_forever()
tasks = asyncio.Task.all_tasks()
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
loop.run_until_complete(group)
loop.close()
現在在coroutine內部處理停止業務,在調用group.cancel()時收到取消信號,在處理關閉loop的run_until_complete階段,main將繼續運行一段時間;
這是收到信號後的回調函數,它通過add_signal_handler()修改了loop的配置;
在回調函數開始執行時,首先要停止loop,這使得關閉業務代碼開始執行;
此時已經開始停止代碼業務,因此移除SIGTERM來忽視後續的停止信號,否則會使停止代碼業務也被終止;
原理與上面類似,但SIGINT不能簡單地remove,因為KeyboardInterrupt默認是SIGINT信號的handler,需要將SIGINT的handler置空;
在這裏配置信號的回調函數,都指向handler,因此配置了SIGINT的handler,會覆蓋掉默認的KeyboardInterrupt。
在關閉過程中等待Executor執行
在QuickStart中有一段代碼使用了阻塞的sleep()
調用,當時說明了一個情況即如果該阻塞調用耗時比loop的執行耗時長時會發生什麽,現在來討論,先放結論,如果不進行人工幹預將會得到一系列errors。
import time
import asyncio
async def main():
print(f‘{time.ctime()} Hello!‘)
await asyncio.sleep(1.0)
print(f‘{time.ctime()} Goodbye!‘)
loop.stop()
def blocking():
time.sleep(1.5)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
λ python3 quickstart.py
Sun Sep 30 14:11:57 2018 Hello!
Sun Sep 30 14:11:58 2018 Goodbye!
Sun Sep 30 14:11:59 2018 Hello from a thread!
exception calling callback for <Future at 0x36cff70 state=finished returned NoneType>
Traceback (most recent call last):
...
raise RuntimeError(‘Event loop is closed‘)
RuntimeError: Event loop is closed
來看下背後發生了什麽,run_in_executor()
返回的是Future而不是Task,這說明它不能被asyncio.Task.all_tasks()
感知,所以後續的run_until_complete()
也就不會等待這個Future執行完畢。
有三個解決思路,都經過了不同程度的權衡,下面逐個過一遍,從不同視角觀察事件loop的內涵,思考在程序中相互調用的所有coroutine、線程、子進程的生命周期管理。
第一個思路,將executor放到coroutine中並以此建立一個task。
# OPTION-A
import time
import asyncio
async def main():
print(f‘{time.ctime()} Hello!‘)
await asyncio.sleep(1.0)
print(f‘{time.ctime()} Goodbye!‘)
loop.stop()
def blocking():
time.sleep(2.0)
print(f"{time.ctime()} Hello from a thread!")
async def run_blocking(): # 1
await loop.run_in_executor(None, blocking)
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.create_task(run_blocking()) # 2
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)
loop.close()
這個想法是run_in_executor返回的Future而不是task,雖然無法用all_tasks()捕獲,但可以用await等待一個Future,所以用一個新的coroutine來await在executor中的阻塞調用,這個新的coroutine將被作為task添加到loop;
就像運行main一樣將這個coroutine添加到loop中。
上述代碼看起來不錯,除了不能執行任務取消。可以發現代碼中少了group.cancel()
,倘若加回來又會得到Event loop is closed
錯誤,甚至不能在run_blocking()
中處理CancelledError以便重新await Future,無論做什麽該task都會被取消,但executor會將其內部的sleep執行完。
第二個思路,收集尚未完成的task,僅取消它們,但在調用run_until_complete()
之前要將run_in_executor()
生成的Future添加進去。
# OPTION-B
import time
import asyncio
async def main():
print(f‘{time.ctime()} Hello!‘)
await asyncio.sleep(1.0)
print(f‘{time.ctime()} Goodbye!‘)
loop.stop()
def blocking():
time.sleep(2.0)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
loop.create_task(main())
future = loop.run_in_executor(None, blocking) # 1
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop) # 2
group_tasks = asyncio.gather(*tasks, return_exceptions=True)
group_tasks.cancel() # 取消tasks
group = asyncio.gather(group_task, future) # 3
loop.run_until_complete(group)
loop.close()
記錄返回的Future;
此處loop已停止,先獲得所有task,註意這裏面沒有executor的Future;
創建了一個新的group來合並tasks和Future,在這種情況下executor也能正常退出,而tasks仍然通過正常的cancel來取消。
這個解決辦法在關閉時比較友好,但仍然有缺陷。通常來說,在整個程序中通過某種方式收集所有的executor返回的Future對象,然後與tasks合並,然後等待執行完成,這十分不方便,雖然有效,但還有更好的解決辦法。
# OPTION-C
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor as Executor
async def main():
print(f‘{time.ctime()} Hello!‘)
await asyncio.sleep(1.0)
print(f‘{time.ctime()} Goodbye!‘)
loop.stop()
def blocking():
time.sleep(2.0)
print(f"{time.ctime()} Hello from a thread!")
loop = asyncio.get_event_loop()
executor = Executor() # 1
loop.set_default_executor(executor) # 2
loop.create_task(main())
future = loop.run_in_executor(None, blocking) # 3
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
loop.run_until_complete(group)
executor.shutdown(wait=True) # 4
loop.close()
建立自己的executor實例;
將其設定為loop的默認executor;
像以前一樣;
明確地在loop關閉前等待executor的所有Future執行完,這可以避免"Event loop is closed"這樣的錯誤信息,能這樣做是因為獲得了使用executor的權限,而asyncio默認的executor沒有開放相應的接口調用。
現在可以在任何地方調用run_in_executor()
,並且程序可以優雅地退出了。
深入Asyncio(十一)優雅地開始與結束