【Python】【控制流程】【生成器 | 協程 | 期物 | 任務】對比與聯系
Python 的 asyncio 類似於 C++ 的 Boost.Asio。
所謂「異步 IO」,就是你發起一個 IO 操作,卻不用等它結束,你可以繼續做其他事情,當它結束時,你會得到通知。
Asyncio 是並發(concurrency)的一種方式。對 Python 來說,並發還可以通過線程(threading)和多進程(multiprocessing)來實現。
Asyncio 並不能帶來真正的並行(parallelism)。當然,因為 GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的並行。
可交給 asyncio 執行的任務,稱為協程(coroutine)。一個協程可以放棄執行,把機會讓給其它協程(即 yield from 或 await)。
定義協程
協程的定義,需要使用 async def 語句。
?
code
1
async def do_some_work(x): pass
do_some_work 便是一個協程。
準確來說,do_some_work 是一個協程函數,可以通過 asyncio.iscoroutinefunction 來驗證:
?
code
1
print(asyncio.iscoroutinefunction(do_some_work)) # True
這個協程什麽都沒做,我們讓它睡眠幾秒,以模擬實際的工作量 :
?
code
1
2
3
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
在解釋 await 之前,有必要說明一下協程可以做哪些事。協程可以:
?
code
1
2
3
4
5
* 等待一個 future 結束
- 等待另一個協程(產生一個結果,或引發一個異常)
- 產生一個結果給正在等它的協程
引發一個異常給正在等它的協程
asyncio.sleep 也是一個協程,所以 await asyncio.sleep(x) 就是等待另一個協程。可參見 asyncio.sleep 的文檔:
?
code
1
2
sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds).
運行協程
調用協程函數,協程並不會開始運行,只是返回一個協程對象,可以通過 asyncio.iscoroutine 來驗證:
?
code
1
print(asyncio.iscoroutine(do_some_work(3))) # True
此處還會引發一條警告:
?
code
1
2
async1.py:16: RuntimeWarning: coroutine ‘do_some_work‘ was never awaited
print(asyncio.iscoroutine(do_some_work(3)))
要讓這個協程對象運行的話,有兩種方式:
?
code
1
2
3
* 在另一個已經運行的協程中用
await
等待它
通過
ensure_future
函數計劃它的執行
簡單來說,只有 loop 運行了,協程才可能運行。
下面先拿到當前線程缺省的 loop ,然後把協程對象交給 loop.run_until_complete,協程對象隨後會在 loop 裏得到運行。
?
code
1
2
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))
run_until_complete 是一個阻塞(blocking)調用,直到協程運行結束,它才返回。這一點從函數名不難看出。
run_until_complete 的參數是一個 future,但是我們這裏傳給它的卻是協程對象,之所以能這樣,是因為它在內部做了檢查,通過 ensure_future 函數把協程對象包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:
?
code
1
loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
完整代碼:
?
code
1
2
3
4
5
6
7
8
import asyncio
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))
運行結果:
?
code
1
2
Waiting 3
<三秒鐘後程序結束>
回調
假如協程是一個 IO 的讀操作,等它讀完數據後,我們希望得到通知,以便下一步數據的處理。這一需求可以通過往 future 添加回調來實現。
?
code
1
2
3
4
5
6
7
def done_callback(futu):
print(‘Done‘)
futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)
loop.run_until_complete(futu)
多個協程
實際項目中,往往有多個協程,同時在一個 loop 裏運行。為了把多個協程交給 loop,需要借助 asyncio.gather 函數。
?
code
1
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))
或者先把協程存在列表裏:
?
code
1
2
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))
運行結果:
?
code
1
2
3
4
Waiting 3
Waiting 1
<等待三秒鐘>
Done
這兩個協程是並發運行的,所以等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程為準。
參考函數 gather 的文檔:
gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutines or futures.
發現也可以傳 futures 給它:
?
code
1
2
3
4
futus = [asyncio.ensure_future(do_some_work(1)),
asyncio.ensure_future(do_some_work(3))]
loop.run_until_complete(asyncio.gather(*futus))
gather 起聚合的作用,把多個 futures 包裝成單個 future,因為 loop.run_until_complete 只接受單個 future。
run_until_complete 和 run_forever
我們一直通過 run_until_complete 來運行 loop ,等到 future 完成,run_until_complete 也就返回了。
?
code
1
2
3
4
5
6
7
8
9
async def do_some_work(x):
print(‘Waiting ‘ + str(x))
await asyncio.sleep(x)
print(‘Done‘)
loop = asyncio.get_event_loop()
coro = do_some_work(3)
loop.run_until_complete(coro)
輸出:
?
code
1
2
3
4
Waiting 3
<等待三秒鐘>
Done
<程序退出>
現在改用 run_forever:
?
code
1
2
3
4
5
6
7
8
9
10
11
async def do_some_work(x):
print(‘Waiting ‘ + str(x))
await asyncio.sleep(x)
print(‘Done‘)
loop = asyncio.get_event_loop()
coro = do_some_work(3)
asyncio.ensure_future(coro)
loop.run_forever()
輸出:
?
code
1
2
3
4
Waiting 3
<等待三秒鐘>
Done
<程序沒有退出>
三秒鐘過後,future 結束,但是程序並不會退出。run_forever 會一直運行,直到 stop 被調用,但是你不能像下面這樣調 stop:
?
code
1
2
loop.run_forever()
loop.stop()
run_forever 不返回,stop 永遠也不會被調用。所以,只能在協程中調 stop:
?
code
1
2
3
4
5
async def do_some_work(loop, x):
print(‘Waiting ‘ + str(x))
await asyncio.sleep(x)
print(‘Done‘)
loop.stop()
這樣並非沒有問題,假如有多個協程在 loop 裏運行:
?
code
1
2
3
4
asyncio.ensure_future(do_some_work(loop, 1))
asyncio.ensure_future(do_some_work(loop, 3))
loop.run_forever()
第二個協程沒結束,loop 就停止了——被先結束的那個協程給停掉的。
要解決這個問題,可以用 gather 把多個協程合並成一個 future,並添加回調,然後在回調裏再去停止 loop。
?
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def do_some_work(loop, x):
print(‘Waiting ‘ + str(x))
await asyncio.sleep(x)
print(‘Done‘)
def done_callback(loop, futu):
loop.stop()
loop = asyncio.get_event_loop()
futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))
futus.add_done_callback(functools.partial(done_callback, loop))
loop.run_forever()
其實這基本上就是 run_until_complete 的實現了,run_until_complete 在內部也是調用 run_forever。
Close Loop?
以上示例都沒有調用 loop.close,好像也沒有什麽問題。所以到底要不要調 loop.close 呢?
簡單來說,loop 只要不關閉,就還可以再運行。:
?
code
1
2
3
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()
但是如果關閉了,就不能再運行了:
?
code
1
2
3
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3)) # 此處異常
建議調用 loop.close,以徹底清理 loop 對象防止誤用。
gather vs. wait
asyncio.gather 和 asyncio.wait 功能相似。
?
code
1
2
coros = [do_some_work(loop, 1), do_some_work(loop, 3)]
loop.run_until_complete(asyncio.wait(coros))
具體差別可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。
Timer
C++ Boost.Asio 提供了 IO 對象 timer,但是 Python 並沒有原生支持 timer,不過可以用 asyncio.sleep 模擬。
?
code
1
2
3
4
5
6
7
async def timer(x, cb):
futu = asyncio.ensure_future(asyncio.sleep(x))
futu.add_done_callback(cb)
await futu
t = timer(3, lambda futu: print(‘Done‘))
loop.run_until_complete(t)
第一部分完。
一直對asyncio這個庫比較感興趣,畢竟這是官網也非常推薦的一個實現高並發的一個模塊,python也是在python 3.4中引入了協程的概念。也通過這次整理更加深刻理解這個模塊的使用
asyncio 是幹什麽的?
異步網絡操作
並發
協程
python3.0時代,標準庫裏的異步網絡模塊:select(非常底層) python3.0時代,第三方異步網絡庫:Tornado python3.4時代,asyncio:支持TCP,子進程
現在的asyncio,有了很多的模塊已經在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這裏列出了已經支持的內容,並在持續更新
當然到目前為止實現協程的不僅僅只有asyncio,tornado和gevent都實現了類似功能
關於asyncio的一些關鍵字的說明:
event_loop 事件循環:程序開啟一個無限循環,把一些函數註冊到事件循環上,當滿足事件發生的時候,調用相應的協程函數
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會返回一個協程對象。協程對象需要註冊到事件循環,由事件循環調用。
task 任務:一個協程對象就是一個原生可以掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各種狀態
future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。
看了上面這些關鍵字,你可能扭頭就走了,其實一開始了解和研究asyncio這個模塊有種抵觸,自己也不知道為啥,這也導致很長一段時間,這個模塊自己也基本就沒有關註和使用,但是隨著工作上用python遇到各種性能問題的時候,自己告訴自己還是要好好學習學習這個模塊。
定義一個協程
復制代碼
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print("waiting:", x)
start = now()
這裏是一個協程對象,這個時候do_some_work函數並沒有執行
coroutine = do_some_work(2)
print(coroutine)
創建一個事件loop
loop = asyncio.get_event_loop()
將協程加入到事件循環loop
loop.run_until_complete(coroutine)
print("Time:",now()-start)
復制代碼
在上面帶中我們通過async關鍵字定義一個協程(coroutine),當然協程不能直接運行,需要將協程加入到事件循環loop中
asyncio.get_event_loop:創建一個事件循環,然後使用run_until_complete將協程註冊到事件循環,並啟動事件循環
創建一個task
協程對象不能直接運行,在註冊事件循環的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)對象. task對象是Future類的子類,保存了協程運行後的狀態,用於未來獲取協程的結果
復制代碼
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:", x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print("Time:",now()-start)
復制代碼
結果為:
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>
waiting: 2
<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>
Time: 0.0003514289855957031
創建task後,在task加入事件循環之前為pending狀態,當完成後,狀態為finished
關於上面通過loop.create_task(coroutine)創建task,同樣的可以通過 asyncio.ensure_future(coroutine)創建task
關於這兩個命令的官網解釋: https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future
asyncio.ensure_future(coro_or_future, *, loop=None)?
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.
If the argument is a Future, it is returned directly.
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task
復制代碼
AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.
Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.
This method was added in Python 3.4.2. Use the async() function to support also older Python versions.
復制代碼
綁定回調
綁定回調,在task執行完成的時候可以獲取執行的結果,回調的最後一個參數是future對象,通過該對象可以獲取協程返回值。
復制代碼
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print("waiting:",x)
return "Done after {}s".format(x)
def callback(future):
print("callback:",future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)
print("Time:", now()-start)
復制代碼
結果為:
復制代碼
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
Time: 0.00039196014404296875
復制代碼
通過add_done_callback方法給task任務添加回調函數,當task(也可以說是coroutine)執行完成的時候,就會調用回調函數。並通過參數future獲取協程執行的結果。這裏我們創建 的task和回調裏的future對象實際上是同一個對象
阻塞和await
使用async可以定義協程對象,使用await可以針對耗時的操作進行掛起,就像生成器裏的yield一樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行
耗時的操作一般是一些IO操作,例如網絡請求,文件讀取等。我們使用asyncio.sleep函數來模擬IO操作。協程的目的也是讓這些IO操作異步化。
復制代碼
import asyncio
import time
now = lambda :time.time()
async def do_some_work(x):
print("waiting:",x)
# await 後面就是調用耗時的操作
await asyncio.sleep(x)
return "Done after {}s".format(x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
print("Task ret:", task.result())
print("Time:", now() - start)
復制代碼
在await asyncio.sleep(x),因為這裏sleep了,模擬了阻塞或者耗時操作,這個時候就會讓出控制權。 即當遇到阻塞調用的函數的時候,使用await方法將協程的控制權讓出,以便loop調用其他的協程。
並發和並行
並發指的是同時具有多個活動的系統
並行值得是用並發來使一個系統運行的更快。並行可以在操作系統的多個抽象層次進行運用
所以並發通常是指有多個任務需要同時進行,並行則是同一個時刻有多個任務執行
下面這個例子非常形象:
並發情況下是一個老師在同一時間段輔助不同的人功課。並行則是好幾個老師分別同時輔助多個學生功課。簡而言之就是一個人同時吃三個饅頭還是三個人同時分別吃一個的情況,吃一個饅頭算一個任務
復制代碼
import asyncio
import time
now = lambda :time.time()
async def do_some_work(x):
print("Waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print("Task ret:",task.result())
print("Time:",now()-start)
復制代碼
運行結果:
復制代碼
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004154920578003
復制代碼
總時間為4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。如果是同步順序的任務,那麽至少需要7s。此時我們使用了aysncio實現了並發。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個task列表,後者接收一堆task。
關於asyncio.gather和asyncio.wait官網的說明:
https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
復制代碼
Return a future aggregating results from the given coroutine objects or futures.
All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.
復制代碼
https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
復制代碼
Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).
The sequence futures must not be empty.
timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
return_when indicates when this function should return.
復制代碼
協程嵌套
使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了嵌套的協程,即一個協程中await了另外一個協程,如此連接起來。
復制代碼
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print("Task ret:", task.result())
# results = await asyncio.gather(*tasks)
# for result in results:
# print("Task ret:",result)
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)
復制代碼
如果我們把上面代碼中的:
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print("Task ret:", task.result())
替換為:
results = await asyncio.gather(*tasks)
for result in results:
print("Task ret:",result)
這樣得到的就是一個結果的列表
不在main協程函數裏處理結果,直接返回await的內容,那麽最外層的run_until_complete將會返回main協程的結果。 將上述的代碼更改為:
復制代碼
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.gather(*tasks)
start = now()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print("Task ret:",result)
print("Time:", now()-start)
復制代碼
或者返回使用asyncio.wait方式掛起協程。
將代碼更改為:
復制代碼
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.wait(tasks)
start = now()
loop = asyncio.get_event_loop()
done,pending = loop.run_until_complete(main())
for task in done:
print("Task ret:",task.result())
print("Time:", now()-start)
復制代碼
也可以使用asyncio的as_completed方法
復制代碼
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print("Task ret: {}".format(result))
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)
復制代碼
從上面也可以看出,協程的調用和組合非常靈活,主要體現在對於結果的處理:如何返回,如何掛起
協程的停止
future對象有幾個狀態:
Pending
Running
Done
Cacelled
創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task
import asyncio
import time
now = lambda :time.time()
async def do_some_work(x):
print("Waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),]
start = now()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
print("Time:",now()-start)
啟動事件循環之後,馬上ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。然後通過循環asyncio.Task取消future。可以看到輸出如下:
Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result=‘Done after 1s‘>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=
循環task,逐個cancel是一種方案,可是正如上面我們把task的列表封裝在main函數中,main函數外進行事件循環的調用。這個時候,main相當於最外出的一個task,那麽處理包裝的main函數即可。
不同線程的事件循環
很多時候,我們的事件循環用於註冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然後在新建一個線程,在新線程中啟動事件循環。當前線程不會被block。
import asyncio
from threading import Thread
import time
now = lambda :time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def more_work(x):
print(‘More work {}‘.format(x))
time.sleep(x)
print(‘Finished more work {}‘.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(‘TIME: {}‘.format(time.time() - start))
new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
復制代碼
啟動上述代碼之後,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法註冊的more_work方法, 後者因為time.sleep操作是同步阻塞的,因此運行完畢more_work需要大致6 + 3
新線程協程
復制代碼
import asyncio
import time
from threading import Thread
now = lambda :time.time()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def do_some_work(x):
print(‘Waiting {}‘.format(x))
await asyncio.sleep(x)
print(‘Done after {}s‘.format(x))
def more_work(x):
print(‘More work {}‘.format(x))
time.sleep(x)
print(‘Finished more work {}‘.format(x))
start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(‘TIME: {}‘.format(time.time() - start))
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
上述的例子,主線程中創建一個new_loop,然後在另外的子線程中開啟一個無限事件循環。 主線程通過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的並發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。
【Python】【控制流程】【生成器 | 協程 | 期物 | 任務】對比與聯系