Python asyncio 非同步IO的理解
1、理解概念
asyncio 是用來編寫併發程式碼的庫,使用async/await語法。
(1)何為併發:
併發就是在一個時間段內,同時做多個事情。
比如在單CPU的機器中(只有一個CPU的機器),我們可以一邊聽歌,一邊鬥地主,一邊聊QQ。
在我們看來,我們是同時在做這三件事,可是在單CPU中,這並不是同時進行的。
單CPU把自己分成了一段一段的時間片,每個時間片用來做一個事情。
如果該時間片用光了,但是事件還沒有做完,那就先不做了。CPU先去做其他的,做完其他的事情,再利用新的時間片,來接著做這個事情。
因為時間片之間的切換速度是很快的,使用者根本體驗不出來,所以就感覺是同時在做這些事情。
以上就可以理解為併發。
(2)何為並行
基本上提到了併發,總有人要問什麼是並行,以及並行與併發的區別。
並行就是同一時間點,同時做多個事情。聽起來和併發差不多,但是還是有區別的,看完下面的解釋就容易理解了。
並行在單CPU中是不可能存在的,因為並行是真真實實存在的。而併發是巨集觀上存在的,
比如在多CPU的機器中(有多個CPU的機器,與單CPU對立),我們可以一邊聽歌,一邊鬥地主,一邊聊QQ。
在我們看來,我們是同時在做這三件事,在多CPU中,其實也是同時在做這三個事情
一個CPU負責聽歌的程序,一個CPU負責鬥地主的程序,一個CPU負責QQ的程序。
在物理時間上,CPU真的就是在同時做這三個事情,只是每個事件用的CPU不同而已。
以上可以理解為並行。
備註:我這裡所說的多CPU,一個CPU負責一個程序,其實也可以理解為一個CPU有多個核,一個核負責一個程序,這樣也是說的通的。
因為我對多CPU以及多核CPU還沒有理解清楚,所以上面的舉例就用了多CPU。本文主要是記錄自己的理解,所以難免有不懂的地方。請包涵。
(3)併發與並行的共同點與區別
共同點:
我們可以理解併發與並行都是同一個時間,做多個事情,執行多個程式。
不同點:
併發是在巨集觀上存在的,是在邏輯上存在的,理解了(1),應該就能理解這句話了。這也是為什麼用單CPU舉例的意義所在。
並行是在物理上存在的,是真實存在的。理解了(2),應該就能理解這句話了。
單核CPU是不可能存在並行的。
單CPU中程序只能是併發,多CPU計算機中程序可以並行。
單CPU單核中執行緒只能併發,單CPU多核中執行緒可以並行。
(4)併發與並行學習連結
https://cloud.tencent.com/developer/article/1424249 圖形並茂的,助於理解!
https://laike9m.com/blog/huan-zai-yi-huo-bing-fa-he-bing-xing,61/ 義正言辭的!
https://sunweiguo.github.io/2019/11/17/miscellany/20%E5%85%B3%E4%BA%8E%E5%A4%9ACPU%E5%92%8C%E5%A4%9A%E6%A0%B8CPU%E7%9A%84%E5%8C%BA%E5%88%AB/ CPU知識點!
(5)協程理解
一般實現併發的方式就是使用多程序,多執行緒。這裡加入一個協程概念,協程也可以實現併發。
asyncio就是通過協程,實現的併發。與本文第一句相互呼應,嘿嘿!
我們編寫程式,正常執行是序列的,即從上而下依次執行。
比如定義一個函式,執行函式就是從函式的第一行,依次執行第二行,再執行第三行......最後執行最後一行,然後結束函式的執行。
協程就是執行一個函式,到某一行,然後轉去執行其他函式,執行其他函式到某一行,又去執行了其他函式,最後在合適的時間點,又轉回來執行最初的函式
在合適的時間點,指的是其他函式執行結束,或者其他函式主動釋放CPU。
比如某個函式中有sleep操作,這個時間,等待睡眠是浪費CPU的,這個時候可以跳出該函式,轉去執行其他函式,當其他函式執行完,再接著執行這個sleep的後一句
以上就是個人理解協程,比較空洞,結合下面的程式碼,就能容易理解一點了。
2、使用async關鍵字定義協程
def function(name): print('這是正常的函式,name:{}'.format(name)) async def coroutines_function(name): print('這是協程函式,name:{}'.format(name)) if __name__ == '__main__': function('hello') # 正常執行 coroutines_function('hello') # 報錯 coroutine 'coroutines_function' was never awaited
3、通過asyncio執行協程(開始了asycnio的使用)
協程物件不能直接執行,需要新增到事件迴圈中,在合適的時間點,事件迴圈自動執行
合適的時間點指的是CPU分配時間片給他,其他程式不佔用CPU
import asyncio # 匯入非同步io庫 async def coroutines_function(name): print('這是協程函式,name:{}'.format(name)) if __name__ == '__main__': obj = coroutines_function('墨玉麒麟') # 返回協程物件 print(type(obj)) # <class 'coroutine'> loop = asyncio.get_event_loop() # 獲取事件迴圈 loop.run_until_complete(obj) # 註冊到事件迴圈中 loop.close() # 關閉事件迴圈
執行
4、建立task物件
上面3中的示例程式碼 loop.run_until_complete(obj)
傳入的引數是一個協程物件,傳入後協程物件會被包裝成task物件,這樣執行的就是task物件,而不是協程物件
可以這麼寫,建立一個task物件,然後傳給run_until_complete函式
import asyncio # 匯入非同步io庫 async def coroutines_function(name): print('這是協程函式,name:{}'.format(name)) if __name__ == '__main__': obj = coroutines_function('墨玉麒麟') # 返回協程物件 print(type(obj)) # <class 'coroutine'> loop = asyncio.get_event_loop() # 獲取事件迴圈 task = loop.create_task(obj) # 建立task print(type(task)) # <class '_asyncio.Task'> loop.run_until_complete(task) # 註冊到事件迴圈中 loop.close() # 關閉事件迴圈
執行
5、task物件的狀態
task物件被創建出來時,不會被執行,這時候狀態是pending
之後被新增到事件迴圈中,等待執行
執行結束後狀態為finished
import asyncio # 匯入非同步io庫 async def coroutines_function(name): print('這是協程函式,name:{}'.format(name)) if __name__ == '__main__': obj = coroutines_function('墨玉麒麟') # 返回協程物件 print(type(obj)) # <class 'coroutine'> loop = asyncio.get_event_loop() # 獲取事件迴圈 task = loop.create_task(obj) # 建立task print(type(task)) # <class '_asyncio.Task'> print(task) # <Task pending coro=<coroutines_function() loop.run_until_complete(task) # 註冊到事件迴圈中 print(task) # <Task finished coro=<coroutines_function() done loop.close() # 關閉事件迴圈
執行
6、回撥函式
可以為task物件新增一個回撥函式,執行完task後,自動執行繫結的回撥函式
import asyncio # 匯入非同步io庫 async def coroutines_function(name): print('這是協程函式,name:{}'.format(name)) def callback(function): print('協程函式本身:{}'.format(function)) print('回撥函式完成 ---> coroutines_function') if __name__ == '__main__': obj = coroutines_function('墨玉麒麟') # 返回協程物件 loop = asyncio.get_event_loop() # 獲取事件迴圈 task = loop.create_task(obj) # 建立task task.add_done_callback(callback) loop.run_until_complete(task) # 註冊到事件迴圈中 loop.close() # 關閉事件迴圈
執行
7、獲取協程函式的返回值
只有當協程函式執行完畢,狀態為finished時,才可以獲取返回值,提前獲取報錯
import asyncio # 匯入非同步io庫 async def coroutines_function(name): print('這是協程函式,name:{}'.format(name)) return 'coroutines_function-->{}'.format(name) def callback(function): print('協程函式本身:{}'.format(function)) print('回撥函式完成 ---> coroutines_function') if __name__ == '__main__': obj = coroutines_function('墨玉麒麟') # 返回協程物件 loop = asyncio.get_event_loop() # 獲取事件迴圈 task = loop.create_task(obj) # 建立task # print(task.result()) # 報錯 Result is not set. task.add_done_callback(callback) loop.run_until_complete(task) # 註冊到事件迴圈中 print('task result:{}'.format(task.result())) # coroutines_function-->墨玉麒麟 loop.close() # 關閉事件迴圈
執行
8、await讓出CPU控制器
以上的例子都不能演示併發的情景,因為以上的例子沒有用到await關鍵字
我們可以通過使用await關鍵字,來讓出CPU的控制器,去執行其他函式的語句,從而實現併發機制
當事件迴圈看到await關鍵字時,事件迴圈會掛起該協程,從而去執行其他協程。其他協程掛起時,或者其他協程執行完畢,事件迴圈再繼續執行最初的協程
(1)舉個睡眠的例子
import asyncio # 匯入非同步io庫 import time async def sleep_second(second): print('將要睡眠{}秒'.format(second)) await asyncio.sleep(second) print('睡眠{}秒完成,醒過來了'.format(second)) return '休息了{}秒'.format(second) if __name__ == '__main__': start = time.time() obj1 = sleep_second(5) # 返回協程物件 obj3 = sleep_second(3) # 返回協程物件 obj5 = sleep_second(1) # 返回協程物件 loop = asyncio.get_event_loop() # 獲取事件迴圈 task1 = loop.create_task(obj1) # 建立task task3 = loop.create_task(obj3) # 建立task task5 = loop.create_task(obj5) # 建立task loop.run_until_complete(task1) # 註冊到事件迴圈中 loop.run_until_complete(task3) # 註冊到事件迴圈中 loop.run_until_complete(task5) # 註冊到事件迴圈中 loop.close() # 關閉事件迴圈 end = time.time() print('耗時:{}'.format(end - start))
執行
可以看出,當執行到睡眠5秒的時候,程式並沒有等待,而是去執行睡眠3秒,再去執行睡眠1秒,最後1秒先執行完畢,先返回,再是睡眠3秒返回,再是睡眠5秒返回
用時是5秒,而不是5 + 3 + 1 = 9 秒
備註 await後面需要跟協程函式,而不能是普通函式,這裡不能用time.sleep()代替asyncio.sleep(),用了會報錯,可以自己換了嘗試一下看看
(2)演示協程巢狀
import asyncio # 匯入非同步io庫 import time async def fun1(name): print('fun1 name begin:{}'.format(name)) print('fun1 name end:{}'.format(name)) async def fun2(name): print('fun2 name begin:{}'.format(name)) await fun1('墨玉麒麟') print('fun2 name end:{}'.format(name)) async def fun3(name): print('fun3 name begin:{}'.format(name)) await fun2('張龍趙虎') print('fun3 name end:{}'.format(name)) if __name__ == '__main__': start = time.time() obj3 = fun3('神龍吸水') # 返回協程物件 loop = asyncio.get_event_loop() # 獲取事件迴圈 task3 = loop.create_task(obj3) # 建立task loop.run_until_complete(task3) # 註冊到事件迴圈中 loop.close() # 關閉事件迴圈 end = time.time() print('耗時:{}'.format(end - start))
執行
備註:實際專案中,一般asyncio.sleep()都是用其他協程的庫來代替,比如aiohttp、aiomysq等
學習連結:https://www.jianshu.com/p/b5e347b3a17c