Python的異步編程[0] -> 協程[1] -> 使用協程建立自己的異步非阻塞模型
使用協程建立自己的異步非阻塞模型
接下來例子中,將使用純粹的Python編碼搭建一個異步模型,相當於自己構建的一個asyncio模塊,這也許能對asyncio模塊底層實現的理解有更大的幫助。主要參考為文末的鏈接,以及自己的補充理解。
完整代碼
1 #!/usr/bin/python 2 # ============================================================= 3 # File Name: async_base.py 4 # Author: LI Ke 5 # Created Time: 1/29/2018 09:18:50 6View Code# ============================================================= 7 8 9 import types 10 import time 11 12 13 @types.coroutine 14 def switch(): 15 print(‘Switch: Start‘) 16 yield 17 print(‘Switch: Done‘) 18 19 async def coro_1(): 20 print(‘C1: Start‘) 21 await switch()22 print(‘C1: Stop‘) 23 24 25 async def coro_2(): 26 print(‘C2: Start‘) 27 print(‘C2: 1‘) 28 print(‘C2: 2‘) 29 print(‘C2: 3‘) 30 print(‘C2: Stop‘) 31 32 c_1 = coro_1() 33 c_2 = coro_2() 34 35 try: 36 c_1.send(None) 37 except StopIteration: 38 pass 39try: 40 c_2.send(None) 41 except StopIteration: 42 pass 43 try: 44 c_1.send(None) 45 except StopIteration: 46 pass 47 48 print(‘--------------------------------‘) 49 50 def run(coros): 51 coros = list(coros) 52 53 while coros: 54 # Duplicate list for iteration so we can remove from original list 55 for coro in list(coros): 56 try: 57 coro.send(None) 58 except StopIteration: 59 coros.remove(coro) 60 61 c_1 = coro_1() 62 c_2 = coro_2() 63 run([c_1, c_2]) 64 65 print(‘--------------------------------‘) 66 67 @types.coroutine 68 def action(t): 69 trace=[] 70 while True: 71 trace.append(time.time()) 72 if trace[-1] - trace[0] > t: 73 break # This break will end this function and raise a StopIteration 74 yield 75 76 async def coro_1(): 77 print(‘C1: Start‘) 78 await action(2) 79 print(‘C1: Stop‘) 80 81 82 async def coro_2(): 83 print(‘C2: Start‘) 84 await action(3) 85 print(‘C2: Stop‘) 86 87 def timeit(f): 88 def _wrapper(*args, **kwargs): 89 start = time.time() 90 re = f(*args, **kwargs) 91 end = time.time() 92 print(‘Time cost:‘, f.__name__, end-start) 93 return re 94 return _wrapper 95 96 c_1 = coro_1() 97 c_2 = coro_2() 98 timeit(run)([c_1]) 99 timeit(run)([c_2]) 100 101 print(‘--------------------------------‘) 102 103 c_1 = coro_1() 104 c_2 = coro_2() 105 timeit(run)([c_1, c_2])
分段解釋
首先會導入需要的模塊,這裏僅僅使用types和time兩個模塊,放棄異步I/O的asyncio模塊。
1 import types 2 import time
接下來定義一個switch函數,利用types.coroutine裝飾器將switch裝飾成一個協程,這個協程將完成一個切換功能。
1 @types.coroutine 2 def switch(): 3 print(‘Switch: Start‘) 4 yield 5 print(‘Switch: Done‘)
隨後定義第一個協程,協程啟動後,會進入一個await,即切入剛才的switch協程,這裏使用async和await關鍵字完成對協程的定義。
1 async def coro_1(): 2 print(‘C1: Start‘) 3 await switch() 4 print(‘C1: Stop‘)
同樣的,再定義第二個協程,第二個協程將從頭到尾順序執行。
1 async def coro_2(): 2 print(‘C2: Start‘) 3 print(‘C2: 1‘) 4 print(‘C2: 2‘) 5 print(‘C2: 3‘) 6 print(‘C2: Stop‘)
有了上面的兩個協程,但我們在異步時,希望在執行完C_1的start後,切換進協程C_2,執行完成後再切換回來。那麽此時就需要一個對協程切換進行控制的程序,具體順序如下,
- 啟動協程c_1,啟動後會切換進switch函數,
- Switch中由於yield而切出,並保留上下文環境
- c_1.send()將獲得返回結果(如果有的話),並繼續執行
- 此時c_1已經被中止,啟動c_2,則完成所有執行步驟,捕獲生成器的中止異常
- 這時c_2以執行完畢,再次切回c_1(此時會從switch yield之後開始執行)繼續執行。
1 c_1 = coro_1() 2 c_2 = coro_2() 3 4 try: 5 c_1.send(None) 6 except StopIteration: 7 pass 8 try: 9 c_2.send(None) 10 except StopIteration: 11 pass 12 try: 13 c_1.send(None) 14 except StopIteration: 15 pass
最終得到結果如下,可以看到,整個過程完全按期望的流程進行,
C1: Start Switch: Start C2: Start C2: 1 C2: 2 C2: 3 C2: Stop Switch: Done C1: Stop
但是這裏的協程運行部分仍需改善,於是接下來便定義一個run函數用於執行一個協程列表。
run函數首先會遍歷協程列表的副本,並不斷嘗試啟動列表中的協程,當協程結束後便將協程從協程列表中刪除,直到所有的協程都執行完畢為止。
1 def run(coros): 2 coros = list(coros) 3 4 while coros: 5 # Duplicate list for iteration so we can remove from original list 6 for coro in list(coros): 7 try: 8 coro.send(None) 9 except StopIteration: 10 coros.remove(coro) 11 12 c_1 = coro_1() 13 c_2 = coro_2() 14 run([c_1, c_2])
測試一下run函數,得到結果與前面相同,
C1: Start Switch: Start C2: Start C2: 1 C2: 2 C2: 3 C2: Stop Switch: Done C1: Stop
到目前為止,完成了一個簡單的異步模型的搭建,即c_2無需等待c_1執行完成再繼續執行,而是由c_1交出了控制權進行協作完成,同時也不存在多線程的搶占式任務,因為由始至終都只有一個線程在運行,而且也沒有混亂的回調函數存在。
但是,還存在一個阻塞問題沒有解決,也就是說,如果c_1中的switch函數是一個耗時的I/O操作或其他阻塞型操作,則此時需要等待switch的阻塞操作完成才能交出控制權,可如果希望在等待這個耗時操作時,先去執行c_2的任務,再回來檢測c_1中的耗時操作是否完成,則需要使用非阻塞的方式。
首先,對剛才的switch進行改造,完成一個action協程,這個協程會根據傳入的參數,執行對應時間後,再退出協程引發StopIteration,實現方式如下,每次切換進action中都會記錄下時間,然後將時間和第一次進入的時間進行對比,如果超過了設置的時間便退出,如果沒超過限制時間,則切出協程交還出控制權。
1 @types.coroutine 2 def action(t): 3 trace=[] 4 while True: 5 trace.append(time.time()) 6 if trace[-1] - trace[0] > t: 7 break # This break will end this function and raise a StopIteration 8 yield
接著定義兩個協程,分別執行action時間為2秒和3秒,同時定義一個計算時間的裝飾器,用於時間記錄。
1 async def coro_1(): 2 print(‘C1: Start‘) 3 await action(2) 4 print(‘C1: Stop‘) 5 6 7 async def coro_2(): 8 print(‘C2: Start‘) 9 await action(3) 10 print(‘C2: Stop‘) 11 12 def timeit(f): 13 def _wrapper(*args, **kwargs): 14 start = time.time() 15 re = f(*args, **kwargs) 16 end = time.time() 17 print(‘Time cost:‘, f.__name__, end-start) 18 return re 19 return _wrapper
然後我們先分別運行兩個協程進行一個實驗,
1 c_1 = coro_1() 2 c_2 = coro_2() 3 timeit(run)([c_1]) 4 timeit(run)([c_2])
從輸出的結果可以看到兩個協程的耗時與action執行的時間基本相同,且順序執行的時間為兩者之和,
C1: Start C1: Stop Time cost: run 2.030202865600586 C2: Start C2: Stop Time cost: run 3.0653066635131836
接下來,利用異步非阻塞的方式來執行這兩個協程,
1 c_1 = coro_1() 2 c_2 = coro_2() 3 timeit(run)([c_1, c_2])
最後得到結果
C1: Start
C2: Start
C1: Stop
C2: Stop
Time cost: run 3.0743072032928467
從結果中可以看到,此時的運行方式是異步的形式,c_1啟動後由於進入一個耗時action,且action被我們設置為非阻塞形式,因此c_1交出了控制權,控制權回到run函數後,啟動了c_2,而c_2同樣也進入到action中,這時兩個協程都在等待任務完成,而監視run則在兩個協程中不停輪詢,不斷進入action中查看各自的action操作是否完成,當有協程完成後,將繼續啟動這個協程的後續操作,直到最終所有協程結束。
按照非阻塞異步協程的方式,可以以單線程運行,避免資源鎖的建立,也消除了線程切換的開銷,並且最終獲得了類似多線程運行的時間性能。
相關閱讀
1. 協程和 async / await
參考鏈接
http://www.oschina.net/translate/playing-around-with-await-async-in-python-3-5
Python的異步編程[0] -> 協程[1] -> 使用協程建立自己的異步非阻塞模型