1. 程式人生 > >asyncio 中給running 的loop 動態新增 Future Task

asyncio 中給running 的loop 動態新增 Future Task

https://my.oschina.net/backbye/blog/1919486

 

asyncio 提供了兩個給執行中的事件迴圈(loop) 新增 事件的方法

call_soon_threadsafe() 、run_coroutine_threadsafe()

因為我們一般的場景中會另起一個執行緒來啟動loop( loop.run_forever() ),所以上面兩個方法設計成執行緒安全

但是上面兩個方法並不能滿足我們全部的需要:

1. run_coroutine_threadsafe 只能用於傳入一個協程,而不能直接傳入task物件(這樣才方便我們在主程序中隨時檢視任務的執行情況和獲取結果),當然可以去修改原始碼,但是不建議這麼做

2. run_forever 雖然解決了可以隨時加入任務的問題,但是帶來一個新的問題:這個執行緒會一直執行下去,除非ctrl-C, 因為1,所以我們無法根據任務的執行情況來loop.stop, 而且由於執行緒安全,即使你執行了loop.stop() 也不會起任何作用

所以源頭要回到執行緒安全上去解決,檢視原始碼發現是通過一個管道檔案來實現的,可以理解為內建了一個佇列,一切對loop的修改都要通過傳送一個訊號到這個佇列,才能同步到running loop的那個執行緒中,從而產生效果。

傳送訊號程式碼:loop._csock.send(b'\0')

下面給個示例,自己去體會

 

import asyncio
import threading

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
    print(asyncio.Task.all_tasks().pop().result())
    
    
async def do_some_work(name):
    for i in range(5):
        print(f"{name}: is working")
        await asyncio.sleep(1)
    return True
        
loop = asyncio.new_event_loop()

threading.Thread(target=start_loop, args=(loop,)).start()

task = loop.create_task(do_some_work("Lili"))

loop._csock.send(b'\0')

while True:
    if task.done():
        loop.stop()
        loop._csock.send(b'\0')
        break