1. 程式人生 > 其它 >Python asyncio 非同步IO的理解

Python asyncio 非同步IO的理解

1、理解概念

asyncio 是用來編寫併發程式碼的庫,使用async/await語法。

(1)何為併發:

併發就是在一個時間段內,同時做多個事情。

比如在單CPU的機器中(只有一個CPU的機器),我們可以一邊聽歌,一邊鬥地主,一邊聊QQ。

在我們看來,我們是同時在做這三件事,可是在單CPU中,這並不是同時進行的。

單CPU把自己分成了一段一段的時間片,每個時間片用來做一個事情。

如果該時間片用光了,但是事件還沒有做完,那就先不做了。CPU先去做其他的,做完其他的事情,再利用新的時間片,來接著做這個事情。

因為時間片之間的切換速度是很快的,使用者根本體驗不出來,所以就感覺是同時在做這些事情。

以上就可以理解為併發。

(2)何為並行

基本上提到了併發,總有人要問什麼是並行,以及並行與併發的區別。

並行就是同一時間點,同時做多個事情。聽起來和併發差不多,但是還是有區別的,看完下面的解釋就容易理解了。

並行在單CPU中是不可能存在的,因為並行是真真實實存在的。而併發是巨集觀上存在的,

比如在多CPU的機器中(有多個CPU的機器,與單CPU對立),我們可以一邊聽歌,一邊鬥地主,一邊聊QQ。

在我們看來,我們是同時在做這三件事,在多CPU中,其實也是同時在做這三個事情

一個CPU負責聽歌的程序,一個CPU負責鬥地主的程序,一個CPU負責QQ的程序。

在物理時間上,CPU真的就是在同時做這三個事情,只是每個事件用的CPU不同而已。

以上可以理解為並行。

備註:我這裡所說的多CPU,一個CPU負責一個程序,其實也可以理解為一個CPU有多個核,一個核負責一個程序,這樣也是說的通的。

因為我對多CPU以及多核CPU還沒有理解清楚,所以上面的舉例就用了多CPU。本文主要是記錄自己的理解,所以難免有不懂的地方。請包涵。

(3)併發與並行的共同點與區別

共同點:

我們可以理解併發與並行都是同一個時間,做多個事情,執行多個程式。

不同點:

併發是在巨集觀上存在的,是在邏輯上存在的,理解了(1),應該就能理解這句話了。這也是為什麼用單CPU舉例的意義所在。

並行是在物理上存在的,是真實存在的。理解了(2),應該就能理解這句話了。

單核CPU是不可能存在並行的。

單CPU中程序只能是併發,多CPU計算機中程序可以並行。

單CPU單核中執行緒只能併發,單CPU多核中執行緒可以並行。

(4)併發與並行學習連結

https://cloud.tencent.com/developer/article/1424249 圖形並茂的,助於理解!

https://laike9m.com/blog/huan-zai-yi-huo-bing-fa-he-bing-xing,61/ 義正言辭的!

https://sunweiguo.github.io/2019/11/17/miscellany/20%E5%85%B3%E4%BA%8E%E5%A4%9ACPU%E5%92%8C%E5%A4%9A%E6%A0%B8CPU%E7%9A%84%E5%8C%BA%E5%88%AB/ CPU知識點!

(5)協程理解

一般實現併發的方式就是使用多程序,多執行緒。這裡加入一個協程概念,協程也可以實現併發。

asyncio就是通過協程,實現的併發。與本文第一句相互呼應,嘿嘿!

我們編寫程式,正常執行是序列的,即從上而下依次執行。

比如定義一個函式,執行函式就是從函式的第一行,依次執行第二行,再執行第三行......最後執行最後一行,然後結束函式的執行。

協程就是執行一個函式,到某一行,然後轉去執行其他函式,執行其他函式到某一行,又去執行了其他函式,最後在合適的時間點,又轉回來執行最初的函式

在合適的時間點,指的是其他函式執行結束,或者其他函式主動釋放CPU。

比如某個函式中有sleep操作,這個時間,等待睡眠是浪費CPU的,這個時候可以跳出該函式,轉去執行其他函式,當其他函式執行完,再接著執行這個sleep的後一句

以上就是個人理解協程,比較空洞,結合下面的程式碼,就能容易理解一點了。

2、使用async關鍵字定義協程

def function(name):
    print('這是正常的函式,name:{}'.format(name))
async def coroutines_function(name):
    print('這是協程函式,name:{}'.format(name))
if __name__ == '__main__':
    function('hello')  # 正常執行
    coroutines_function('hello')  # 報錯 coroutine 'coroutines_function' was never awaited

3、通過asyncio執行協程(開始了asycnio的使用)

協程物件不能直接執行,需要新增到事件迴圈中,在合適的時間點,事件迴圈自動執行

合適的時間點指的是CPU分配時間片給他,其他程式不佔用CPU

 import asyncio  # 匯入非同步io庫


async def coroutines_function(name):
    print('這是協程函式,name:{}'.format(name))


if __name__ == '__main__':
    obj = coroutines_function('墨玉麒麟')  # 返回協程物件
    print(type(obj))  # <class 'coroutine'>
    loop = asyncio.get_event_loop()  # 獲取事件迴圈
    loop.run_until_complete(obj)  # 註冊到事件迴圈中
    loop.close()  # 關閉事件迴圈

執行

4、建立task物件

上面3中的示例程式碼 loop.run_until_complete(obj)

傳入的引數是一個協程物件,傳入後協程物件會被包裝成task物件,這樣執行的就是task物件,而不是協程物件

可以這麼寫,建立一個task物件,然後傳給run_until_complete函式

import asyncio  # 匯入非同步io庫


async def coroutines_function(name):
    print('這是協程函式,name:{}'.format(name))


if __name__ == '__main__':
    obj = coroutines_function('墨玉麒麟')  # 返回協程物件
    print(type(obj))  # <class 'coroutine'>
    loop = asyncio.get_event_loop()  # 獲取事件迴圈
    task = loop.create_task(obj)  # 建立task
    print(type(task))  # <class '_asyncio.Task'>
    loop.run_until_complete(task)  # 註冊到事件迴圈中
    loop.close()  # 關閉事件迴圈

執行

5、task物件的狀態

task物件被創建出來時,不會被執行,這時候狀態是pending

之後被新增到事件迴圈中,等待執行

執行結束後狀態為finished

import asyncio  # 匯入非同步io庫


async def coroutines_function(name):
    print('這是協程函式,name:{}'.format(name))


if __name__ == '__main__':
    obj = coroutines_function('墨玉麒麟')  # 返回協程物件
    print(type(obj))  # <class 'coroutine'>
    loop = asyncio.get_event_loop()  # 獲取事件迴圈
    task = loop.create_task(obj)  # 建立task
    print(type(task))  # <class '_asyncio.Task'>
    print(task)  # <Task pending coro=<coroutines_function()
    loop.run_until_complete(task)  # 註冊到事件迴圈中
    print(task)  # <Task finished coro=<coroutines_function() done
    loop.close()  # 關閉事件迴圈

執行

6、回撥函式

可以為task物件新增一個回撥函式,執行完task後,自動執行繫結的回撥函式

import asyncio  # 匯入非同步io庫


async def coroutines_function(name):
    print('這是協程函式,name:{}'.format(name))


def callback(function):
    print('協程函式本身:{}'.format(function))
    print('回撥函式完成 ---> coroutines_function')


if __name__ == '__main__':
    obj = coroutines_function('墨玉麒麟')  # 返回協程物件
    loop = asyncio.get_event_loop()  # 獲取事件迴圈
    task = loop.create_task(obj)  # 建立task
    task.add_done_callback(callback)
    loop.run_until_complete(task)  # 註冊到事件迴圈中
    loop.close()  # 關閉事件迴圈

執行

7、獲取協程函式的返回值

只有當協程函式執行完畢,狀態為finished時,才可以獲取返回值,提前獲取報錯

import asyncio  # 匯入非同步io庫


async def coroutines_function(name):
    print('這是協程函式,name:{}'.format(name))
    return 'coroutines_function-->{}'.format(name)


def callback(function):
    print('協程函式本身:{}'.format(function))
    print('回撥函式完成 ---> coroutines_function')


if __name__ == '__main__':
    obj = coroutines_function('墨玉麒麟')  # 返回協程物件
    loop = asyncio.get_event_loop()  # 獲取事件迴圈
    task = loop.create_task(obj)  # 建立task
    # print(task.result())  # 報錯 Result is not set.
    task.add_done_callback(callback)
    loop.run_until_complete(task)  # 註冊到事件迴圈中
    print('task result:{}'.format(task.result()))  # coroutines_function-->墨玉麒麟
    loop.close()  # 關閉事件迴圈

執行

8、await讓出CPU控制器

以上的例子都不能演示併發的情景,因為以上的例子沒有用到await關鍵字

我們可以通過使用await關鍵字,來讓出CPU的控制器,去執行其他函式的語句,從而實現併發機制

當事件迴圈看到await關鍵字時,事件迴圈會掛起該協程,從而去執行其他協程。其他協程掛起時,或者其他協程執行完畢,事件迴圈再繼續執行最初的協程

(1)舉個睡眠的例子

import asyncio  # 匯入非同步io庫
import time


async def sleep_second(second):
    print('將要睡眠{}秒'.format(second))
    await asyncio.sleep(second)
    print('睡眠{}秒完成,醒過來了'.format(second))
    return '休息了{}秒'.format(second)


if __name__ == '__main__':
    start = time.time()
    obj1 = sleep_second(5)  # 返回協程物件
    obj3 = sleep_second(3)  # 返回協程物件
    obj5 = sleep_second(1)  # 返回協程物件
    loop = asyncio.get_event_loop()  # 獲取事件迴圈
    task1 = loop.create_task(obj1)  # 建立task
    task3 = loop.create_task(obj3)  # 建立task
    task5 = loop.create_task(obj5)  # 建立task
    loop.run_until_complete(task1)  # 註冊到事件迴圈中
    loop.run_until_complete(task3)  # 註冊到事件迴圈中
    loop.run_until_complete(task5)  # 註冊到事件迴圈中
    loop.close()  # 關閉事件迴圈
    end = time.time()
    print('耗時:{}'.format(end - start))

執行

可以看出,當執行到睡眠5秒的時候,程式並沒有等待,而是去執行睡眠3秒,再去執行睡眠1秒,最後1秒先執行完畢,先返回,再是睡眠3秒返回,再是睡眠5秒返回

用時是5秒,而不是5 + 3 + 1 = 9 秒

備註 await後面需要跟協程函式,而不能是普通函式,這裡不能用time.sleep()代替asyncio.sleep(),用了會報錯,可以自己換了嘗試一下看看

(2)演示協程巢狀

import asyncio  # 匯入非同步io庫
import time


async def fun1(name):
    print('fun1 name begin:{}'.format(name))
    print('fun1 name end:{}'.format(name))


async def fun2(name):
    print('fun2 name begin:{}'.format(name))
    await fun1('墨玉麒麟')
    print('fun2 name end:{}'.format(name))


async def fun3(name):
    print('fun3 name begin:{}'.format(name))
    await fun2('張龍趙虎')
    print('fun3 name end:{}'.format(name))


if __name__ == '__main__':
    start = time.time()
    obj3 = fun3('神龍吸水')  # 返回協程物件
    loop = asyncio.get_event_loop()  # 獲取事件迴圈
    task3 = loop.create_task(obj3)  # 建立task
    loop.run_until_complete(task3)  # 註冊到事件迴圈中
    loop.close()  # 關閉事件迴圈
    end = time.time()
    print('耗時:{}'.format(end - start))

執行

備註:實際專案中,一般asyncio.sleep()都是用其他協程的庫來代替,比如aiohttp、aiomysq等

學習連結:https://www.jianshu.com/p/b5e347b3a17c