1. 程式人生 > 實用技巧 >Python協程與非同步IO

Python協程與非同步IO

協程的概念

什麼是協程?

協程,又稱微執行緒、纖程。英文名為Coroutine,是一種使用者態的輕量級執行緒。

子程式,或者稱為函式,在所有語言中都是層級呼叫的。比如A呼叫B,B在執行過程中又呼叫了C,C執行完畢返回,B執行完畢返回,最後是A執行完畢。所以子程式呼叫是通過棧實現的,一個執行緒就是執行一個子程式。子程式呼叫總是一個入口,一次返回,呼叫順序是明確的。而協程的呼叫和子程式不同。

執行緒是系統級別的,通過作業系統來排程,而協程則是程式級別的由程式根據需要自己排程。在一個執行緒中會有很多函式,我們把這些函式稱為子程式,在子程式執行過程中可以中斷去執行別的子程式,而別的子程式也可以中斷回來繼續執行之前的子程式,這個過程就稱為協程。也就是說在同一執行緒內一段程式碼在執行過程中會中斷然後跳轉執行別的程式碼,接著在之前中斷的地方繼續開始執行。

協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧。因此,協程能保留上一次呼叫時的狀態(即所有區域性狀態的一個特定組合),每次過程重入時,就相當於進入上一次呼叫的狀態。換種說法就是,進入上一次離開時所處邏輯流的位置。

協程的優缺點

優點
(1)無需花費執行緒上下文切換的開銷,協程避免了無意義的排程,從而提高效能。
(2)無需原子操作鎖定及同步的開銷
(3)方便切換控制流,簡化程式設計模型
(4)高併發+高擴充套件性+低成本,一個CPU可以支援上萬個協程,所以很適合用於高併發處理。

缺點
(1)無法利用多核資源。協程的本質是個單執行緒,它不能同時將單個CPU的多個核用上,協程需要和程序配合才能執行在多CPU上,當然我們日常所編寫的絕大部分應用都沒這個必要,除非是CPU密集型應用。
(2)進行阻塞操作(如IO時)會阻塞掉整個程式。

python協程的簡單實現

Python可以通過生成器(generator)來實現對協程的支援。在generator中,不但可以通過for迴圈來迭代,還可以不斷呼叫next()函式獲取由yield語句返回的下一個值。

yield的使用
def func():
    print('starting...')
    while True:
        res = yield 6
        print('res:', res)


if __name__ == '__main__':
    g = func()  # generator
    print(next(g))
    print('------')
    print(next(g))

執行結果如下

starting...
6
------
res: None
6

執行的具體過程如下:

  1. 程式開始執行以後,因為func函式中含有yield關鍵字,所以func函式並不會真正執行,而是得到一個生成器g
  2. 直到呼叫next方法,func函式才正式開始執行,先列印"starting...",然後進入while迴圈中
  3. 程式遇到yield關鍵字,程式暫時掛起並返回一個6(注:這裡並沒有執行賦值給res的操作),此時n第一個next(g)語句執行完成,列印返回的6,並執行往下的語句
  4. 程式執行print("------")
  5. 直到程式遇到第二next(g)語句,這個時候從上次執行next語句停止的地方開始執行,即要執行res的賦值操作。需要注意的是,這個時候賦值操作的右邊是沒有值的,所以這個時候res賦值為None,接著打印出"res:None"
  6. 程式會繼續在while裡執行,當再次遇到yield關鍵字的時候返回6,然後程式掛起,next(g)語句執行完成,並打印出返回的6

帶yield的函式是一個生成器,這個生成器有一個函式就是next函式,next就相當於“下一步”生成哪個數,這一次的next開始的地方是接著上一次的next停止的地方執行的,所以呼叫next的時候,生成器並不會從func函式的開始執行,而是接著上一步停止的地方開始,然後遇到yield後,返回出要生成的值,此步就結束。

yield實現簡單的協程

在上面例子,我們已經瞭解了yield語句的原理和使用,現在我們來嘗試實現一個簡單協程

import time


def a():
    while True:
        print('---A---')
        yield
        time.sleep(0.5)


def b(c):
    while True:
        print('---B---')
        c.__next__()
        time.sleep(0.5)


if __name__ == '__main__':
    a = a()
    b(a)

執行結果如下

---B---
---A---
---B---
---A---
---B---
---A---
---B---
...
send傳送資料

send是傳送一個引數給res,第一個例子中,在yield的時候,並沒有將6賦值給res,下次執行的時候,繼續執行賦值操作就會賦值為None。而使用send方法可以在接著上一次執行的時候,先把send()中的值賦值給res,然後執行next操作,直到遇到下一個yield使程式掛起,返回結果後結束。

def func():
    print('starting...')
    while True:
        res = yield 6
        print('res:', res)


if __name__ == '__main__':
    g = func()
    print(next(g))
    print('------')
    print(g.send(10))  # 傳入10後執行next

執行結果如下

starting...
6
------
res: 10
6
協程實現生產者消費者
import time


def producer(c):
    """生產者"""
    c.send(None)
    for i in range(1, 6):
        print('生產者生產%d產品' % i)
        c.send(str(i))
        time.sleep(1)


def customer():
    """消費者"""
    res = ''
    while True:
        data = yield res
        if not data:
            return
        print('消費者消費%s產品' % data)


if __name__ == '__main__':
    c = customer()
    producer(c)

執行結果如下

生產者生產1產品
消費者消費1產品
生產者生產2產品
消費者消費2產品
生產者生產3產品
消費者消費3產品
生產者生產4產品
消費者消費4產品
生產者生產5產品
消費者消費5產品

非同步IO協程

使用非同步IO,無非是提高我們寫的軟體的併發。這個軟體系統,可以是網路爬蟲,也可以是Web服務等。

併發的方式有很多,如多執行緒、多程序、非同步IO等。多執行緒和多程序更多應用於CPU密集型的場景,比如科學計算的時間都耗費在CPU上,利用多核CPU來分擔計算任務。多執行緒和多程序之間的場景切換和通訊代價很高,不適合IO密集型的場景。而非同步IO就是非常適合IO密集型的場景,比如網路爬蟲和Web服務。

IO就是讀寫磁碟、讀寫網路的操作,這種讀寫速度比讀寫記憶體、CPU快取慢得多,前者的耗時是後者的成千上萬倍甚至更多。這就導致,IO密集型的場景99%以上的時間都花費在IO等待的時間上。非同步IO就是把CPU從漫長的等待中解放出來的方法。

asyncio簡介

基於生成器(使用yield from語句建立的python生成器)的協程將會在python3.10中被移除,取而代之的是asyncio中的async/await語法。

asyncio是Python3.4版本引入的標準庫,直接內建了對非同步IO的支援。asyncio的程式設計模型就是一個訊息迴圈。我們從asyncio模組中直接獲取一個EventLoop的引用,然後把需要執行的協程扔到EventLoop中執行,就實現了非同步IO。
(1)event_loop事件迴圈:程式開啟一個無限的迴圈,程式會把一些函式註冊到事件上。當滿足事件發生的時候,呼叫相應的協程函式。
(2)coroutine協程:協程物件,指一個使用async關鍵字定義的函式,它的呼叫不會立即執行函式,而是會返回一個協程物件。協程物件需要註冊到事件迴圈,由事件迴圈呼叫。
(3)task任務:一個協程物件就是一個原生可以掛起的函式,任務則是對協程進一步封裝,其中包含任務的各種狀態。
(4)future:代表將來執行或沒有執行的任務的結果,它和task上沒有本質的區別。
(5)async/await關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的非同步呼叫介面。

使用asyncio定義一個協程

協程通過async/await語法進行宣告,是編寫asyncio應用的推薦方式。下面來簡單編寫一個協程(注:需要Python3.7+)

import asyncio


async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')


if __name__ == '__main__':
    asyncio.run(main())

執行結果如下

hello
world

上面例子中,我們用async def定義了一個協程main()。如果直接呼叫main()協程,並不會將其加入到執行日程,要真正執行一個協程,需要使用asyncio.run()方法。

可等待物件

如果一個物件可以在await語句中使用,那麼它就是可等待物件,可等待物件主要有三種,分別是協程任務Future

import asyncio
import time


async def do_something(job, cost_time):
    print(f"--- started {job} at {time.strftime('%X')} ---")
    await asyncio.sleep(cost_time)
    print(f"--- finished {job} at {time.strftime('%X')} ---")


async def main():
    start_time = time.time()
    print(f"started at {time.strftime('%X')}")

    await do_something("eating food", 3)
    await do_something("watching TV", 5)

    end_time = time.time()
    print(f"finished at {time.strftime('%X')}, it cost {int(end_time-start_time)}s.")


if __name__ == '__main__':
    asyncio.run(main())

執行結果如下

started at 18:31:20
--- started eating food at 18:31:20 ---
--- finished eating food at 18:31:23 ---
--- started watching TV at 18:31:23 ---
--- finished watching TV at 18:31:28 ---
finished at 18:31:28, it cost 8s.

上面例子中的do_something()協程就是一個可等待物件,使用await掛起do_something()協程(此時主協程main()處於一個阻塞狀態),直到完成do_something()中的任務才會接著執行,總耗時為8s。

asyncio任務

任務被用來設定日程以便併發執行協程

當一個協程通過asyncio.create_task()等函式被打包為一個任務,該協程將自動排入日程準備立即執行,我們可以通過該函式來併發執行作為asyncio任務的多個協程。

我們修改以上例項,併發執行兩個協程

import asyncio
import time


async def do_something(job, cost_time):
    print(f"--- started {job} at {time.strftime('%X')} ---")
    await asyncio.sleep(cost_time)
    print(f"--- finished {job} at {time.strftime('%X')} ---")


async def main():
    start_time = time.time()
    task1 = asyncio.create_task(do_something("eating food", 3))
    task2 = asyncio.create_task(do_something("watching TV", 5))
    print(f"started at {time.strftime('%X')}")

    await task1
    await task2

    end_time = time.time()
    print(f"finished at {time.strftime('%X')}, it cost {int(end_time-start_time)}s.")


if __name__ == '__main__':
    asyncio.run(main())

執行結果如下

started at 18:42:41
--- started eating food at 18:42:41 ---
--- started watching TV at 18:42:41 ---
--- finished eating food at 18:42:44 ---
--- finished watching TV at 18:42:46 ---
finished at 18:42:46, it cost 5s.

可以看出,輸出結果的執行時間比之前快了3秒.

Future物件

Future是一種特殊的低層級可等待物件,表示一個非同步操作的最終結果

當一個Future物件被等待,這意味著協程將保持等待直到該Future物件在其他地方操作完畢。在asyncio中需要Future物件以便允許通過async/await使用基於回撥的程式碼,它和task上沒有本質的區別。

import asyncio
import time


async def add(a, b, cost_time):
    await asyncio.sleep(cost_time)
    return a + b


async def main():
    start_time = time.time()
    res = await asyncio.gather(
        add(1, 2, 2),
        add(5, 6, 4)
    )
    print(res)
    end_time = time.time()
    print(f'it cost {int(end_time - start_time)}s.')


if __name__ == '__main__':
    asyncio.run(main())

執行結果如下

[3, 11]
it cost 4s.

上面例子中使用asyncio.gather()方法掛起兩個協程,每個協程add()都會返回一個值(a+b的和),最終res會接收所有協程返回的值並存放到一個列表中,程式的執行是併發的,這個列表就是最終結果。