重新談非同步程式設計-協程
首先來說什麼是協程?
協程又被稱之為是微執行緒,或者說是在一個執行緒內實現程式碼塊的相互切換執行。
在《計算機作業系統》中我們學過,一個程序中包含若干個執行緒,一個執行緒中可以包含若干個程序。在Python中,一個執行緒又包含若干個協程。CPU如果在程序和程序之間切換,開銷是比較大的,相對來講,同一程序下的執行緒切換開銷要小很多,因為同一程序下的執行緒是共享該程序的資源的。同理,同一執行緒下的協程切換,也要比執行緒切換開銷小。
協程不像執行緒一樣,需要建立執行緒例項化物件,協程是通過協程函式+將協程調入事件迴圈實現的。
協程函式的定義關鍵詞是asyncio
例如:
import asyncio asyncdef fun(): pass
這就是一個協程函式的定義,asyncio模組是內建的,不用額外下載安裝。但是協程函式不能像普通函式一樣,“fun()”,可以執行,而是需要將協程函式註冊到事件迴圈當中,並且協程函式中必須有“await+可等待物件(協程物件、Future物件、Task物件)”。
例如:
import asyncio async def fun(): print('hello') await asyncio.sleep(2) print('word') loop = asyncio.get_event_loop() loop.run_until_complete(fun())
其中,asyncio.get_event_loop()是建立事件迴圈物件,loop.run_until_complete(fun())是將協程函式fun()註冊到事件迴圈當中,並執行。結果是沒有問題的,輸出'hello',等待2s,在輸出'word‘。
當中的asyncio.sleep(2),是模擬IO工作2s,await是將這個操作掛起2s中,或者說這個操作被阻塞2s的時間。這裡的asyncio.sleep(2)不能寫成time.sleep(2),兩個雖然都是等待2s,但是實質是不一樣的。
再py3.7之後,我們可以將
loop = asyncio.get_event_loop()
loop.run_until_complete(fun())
合併為:
asyncio.run(fun())
============================================================================================================================
現在,我們在建立一個新的協程函式。
import asyncio import time async def fun(): print('hello') await asyncio.sleep(2) print('word') async def fun1(): print('how are you?') await asyncio.sleep(2) print('fine') start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(fun()) loop.run_until_complete(fun1()) end_time = time.time() print('耗時',end_time-start_time)
輸出:
hello
word
how are you?
fine
耗時 4.002251625061035
我們發現,雖然是將兩個協程函式分別註冊到了時間迴圈當中,但是並沒有像我們想象中一樣,交替執行,而是仍然序列執行,消耗4s。我們希望是fun協程先執行print('hello'),遇到了一個2s的IO操作,馬上切換到協程fun1的print('how are you?'),又碰到IO操作,繼續將IO操作掛起。經過不到2s的時間後協程fun的IO已經結束了,繼續執行fun的print('word'),再執行fun1的print('fine')。為什麼是這樣?我個人猜測和理解,並沒有證明,loop.run_until_complete(fun())、loop.run_until_complete(fun1())是先後註冊到協程中的,fun()結束之後,才執行fun1()。如果有其他解釋歡迎留言。這裡我們還要正確理解await後邊的IO操作。IO操作是可以和CPU並行進行的,也就是說在同一時刻,既可以CPU計算,又可以IO工作。如果說在IO的時候,CPU在等待,很明顯浪費時間,所以我們希望有IO和CPU並行進行,也就是有IO的協程之間併發。
============================================================================================================================
那我們如何將兩個協程併發起來呢?依照上邊的思想,我們希望將兩個協程同時註冊到事件迴圈當中。
例:
import asyncio import time async def fun(): print('hello') await asyncio.sleep(2) print('word') async def fun1(): print('how are you?') await asyncio.sleep(2) print('fine') start_time = time.time() task1 = asyncio.ensure_future(fun()) task2 = asyncio.ensure_future(fun1()) task_list = [task1,task2] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task_list)) end_time = time.time() print('耗時',end_time-start_time)
輸出:
hello
how are you?
word
fine
耗時 2.0022101402282715
我們發現,這是我們需要的。程式碼中的asyncio.ensure_future()用來建立task物件,task_list是一個列表,列表裡邊每個元素是task物件。
我們列印一下asyncio.wait(task_list),輸出:<coroutine object wait at 0x000001E8C9B523C8>,發現這是一個協程物件(注意不是協程函式)。
這裡有同學說了,我們將協程函式轉成協程物件不就可以了?print(fun()),發現fun()確實是協程物件,但是會報警告,“Enable tracemalloc to get the object allocation traceback”。
所以在這裡,實際上是將協程物件先轉成了task物件,再將task物件組成的列表傳入到loop.run_until_complete()裡。
所以到此為止我們知道了,loop.run_until_complete()這個方法的引數必須是一個協程物件。
我們可以將asyncio.wait(task_list)改為,res = asyncio.gather(*task_list)也可以,wait方法要求傳入的是一個列表,gather方法要求傳入的引數是一個列表解包後的結果。所以也可以這樣寫:asyncio.gather(task1,task2)。
import asyncio import time async def fun(): print('hello') await asyncio.sleep(2) print('word') async def fun1(): print('how are you?') await asyncio.sleep(2) print('fine') start_time = time.time() task1 = asyncio.ensure_future(fun()) task2 = asyncio.ensure_future(fun1()) task_list = [task1,task2] loop = asyncio.get_event_loop() res = asyncio.gather(task1,task2) print(res) loop.run_until_complete(res) end_time = time.time() print('耗時',end_time-start_time)
輸出:
<_GatheringFuture pending>
hello
how are you?
word
fine
耗時 2.0014095306396484
這裡我們發現小細節,asyncio.gather(task1,task2)的返回值並不是前邊說的task類物件,而是一個Future類物件,這是因為task是Future的子類,這下就都明白了吧?pending的意思是未開始,列印的時候還沒有開始run呢。所以說gather比wait更高一級。
那麼task物件到底有什麼用?task物件是把協程物件進行封裝,可以追蹤 coroutine協程物件的完成狀態。也就是說儲存了協程執行後的狀態,用於未來獲取協程的結果。
到此,我們知道了loop.run_until_complete()的引數可以是Future物件,也可以是協程物件。回想第一個例子,我們把fun()傳入到這裡不就可以解釋了嗎?
============================================================================================================================
現在,我們來說說ensure_future和create_task區別。
兩個方法都是來建立task物件,但是如果把上邊的ensure_future修改為create_task會報錯。為啥?
注意:如果當前執行緒中沒有正在執行的事件迴圈,asyncio.create_task將會引發RuntimeError異常。可以理解為asyncio.create_task必須寫在一個協程函式中。
將上面的程式碼略微修改就可以:
import asyncio import time async def fun(): print('hello') await asyncio.sleep(2) print('word') async def fun1(): print('how are you?') await asyncio.sleep(2) print('fine') async def main(): start_time = time.time() task1 = asyncio.create_task(fun()) task2 = asyncio.ensure_future(fun1()) await asyncio.gather(task1,task2) end_time = time.time() print('耗時',end_time-start_time) loop = asyncio.get_event_loop() loop.run_until_complete(main())
輸出:
hello
how are you?
word
fine
耗時 2.000105619430542
在main()協程函式中,await是必須要有的,如果沒有await,這就不是一個正確的協程函式,會報錯。後邊的asyncio.gather(task1,task2)前邊介紹過,返回的是一個Future物件,是沒問題的。我是從實用角度解釋的ensure_future和create_task區別,並不全面,後續碰到實際問題再實際分析。
============================================================================================================================
剛才說的到的asyncio.wait 和 asyncio.gather,有什麼區別呢?
前者的返回值是兩個集合結果,第1個集合是已經完成的task,第2個集合是未完成的task。後者直接返回的是多個task的計算結果。
例:
import asyncio import time async def fun(): print('hello') await asyncio.sleep(2) print('word') async def fun1(): print('how are you?') await asyncio.sleep(2) print('fine') async def main(): start_time = time.time() task1 = asyncio.create_task(fun()) task2 = asyncio.ensure_future(fun1()) res1,res2 = await asyncio.wait([task1,task2]) print(res1) print('======') print(res2) end_time = time.time() print('耗時',end_time-start_time) loop = asyncio.get_event_loop() loop.run_until_complete(main())
輸出:
hello how are you? word fine {<Task finished coro=<fun() done, defined at C:/Users/王棟軒/PycharmProjects/pythonProject1/main.py:98> result=None>, <Task finished coro=<fun1() done, defined at C:/Users/王棟軒/PycharmProjects/pythonProject1/main.py:102> result=None>} ====== set() 耗時 2.000847101211548 程序已結束,退出程式碼0
很容看到,返回值的第1個是2個協程物件,第2個是空。我們可以使用task.result來獲取兩個task物件(兩個task物件是由前邊的兩個協程函式封裝好的)返回值。
例:
import asyncio import time async def fun(): print('hello') await asyncio.sleep(2) print('word') return 'fun1' async def fun1(): print('how are you?') await asyncio.sleep(2) print('fine') return 'fun2' async def main(): start_time = time.time() task1 = asyncio.create_task(fun()) task2 = asyncio.ensure_future(fun1()) res1,res2 = await asyncio.wait([task1,task2]) for i in res1: print(i.result()) end_time = time.time() print('耗時',end_time-start_time) loop = asyncio.get_event_loop() loop.run_until_complete(main())
輸出:
hello how are you? word fine <class '_asyncio.Task'> fun2 <class '_asyncio.Task'> fun1 耗時 2.0008704662323 程序已結束,退出程式碼0
例:
import asyncio import time async def fun(): print('hello') await asyncio.sleep(2) print('word') return 'fun1' async def fun1(): print('how are you?') await asyncio.sleep(2) print('fine') return 'fun2' async def main(): start_time = time.time() task1 = asyncio.create_task(fun()) task2 = asyncio.ensure_future(fun1()) res1,res2 = await asyncio.gather(*[task1,task2]) print(res1,res2) end_time = time.time() print('耗時',end_time-start_time) loop = asyncio.get_event_loop() loop.run_until_complete(main())
輸出:
hello
how are you?
word
fine
fun1 fun2
耗時 2.000060558319092
可以看到,程式碼中用res1和res2分別接收task1和task2的返回結果。
ok。到此,協程的又一步學習告一段落。
耗時1個 晚上+一個1下午。
ending……