1. 程式人生 > 實用技巧 >【Python】協程,看了就會

【Python】協程,看了就會

一、基本介紹

從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程還是以生成器物件為基礎的,在 Python 3.5 則增加了 async/await,使得協程的實現更加方便。
Python 中使用協程最常用的庫莫過於 asyncio,所以本文會以 asyncio 為基礎來介紹協程的使用。
首先我們需要了解下面幾個概念。

  • event_loop:事件迴圈,相當於一個無限迴圈,我們可以把一些函式註冊到這個事件迴圈上,當滿足條件發生的時候,就會呼叫對應的處理方法。
  • coroutine:中文翻譯叫協程,在 Python 中常指代為協程物件型別,我們可以將協程物件註冊到時間迴圈中,它會被事件迴圈呼叫。我們可以使用 async 關鍵字來定義一個方法,這個方法在呼叫時不會立即
    被執行,而是返回一個協程物件。
  • task:任務,它是對協程物件的進一步封裝,包含了任務的各個狀態。
  • future:代表將來執行或沒有執行的任務的結果,實際上和 task 沒有本質區別。

二、攜程用法

定義攜程

首先我們來定義一個協程,體驗一下它和普通程序在實現上的不同之處,程式碼如下:

import asyncio

async def execute(x):
    print('Number:', x)

coroutine1 = execute(1)
print('Coroutine:', coroutine)  # Coroutine: <coroutine object execute at 0x000001ECF6762DC8>
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)  # Number: 1

首先我們引入了 asyncio 這個包,這樣我們才可以使用 asyncawait,然後我們使用 async 定義了一個 execute 方法,方法接收一個數字引數,方法執行之後會列印這個數字。

隨後我們直接呼叫了這個方法,然而這個方法並沒有執行,而是返回了一個 coroutine 協程物件。隨後我們使用 get_event_loop 方法建立了一個事件迴圈 loop,並呼叫了 loop 物件的 run_until_complete 方法將協程注
冊到事件迴圈 loop 中,然後啟動。最後我們才看到了 execute 方法列印了輸出結果。

可見,async 定義的方法就會變成一個無法直接執行的 coroutine

協程物件,必須將其註冊到事件迴圈中才可以執行。

上面我們還提到了 task,它是對 coroutine 物件的進一步封裝,它裡面相比 coroutine 物件多了執行狀態,比如 runningfinished 等,我們可以用這些狀態來獲取協程物件的執行情況。

在上面的例子中,當我們將 coroutine 物件傳遞給 run_until_complete 方法的時候,實際上它進行了一個操作就是將 coroutine 封裝成了 task 物件,我們也可以顯式地進行宣告,如下所示:

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(1)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)  # Task: <Task pending coro=<execute() running at C:/。。。:27>>
loop.run_until_complete(task)  # Number: 1
print('Task:', task)  # Task: <Task finished coro=<execute2() done, defined at C:/。。。:27> result=1>

這裡我們定義了 loop 物件之後,接著呼叫了它的 create_task 方法將 coroutine 物件轉化為了 task 物件,隨後我們列印輸出一下,發現它是 pending 狀態。接著我們將 task 物件新增到事件迴圈中得到執行,隨後我們再列印輸出一下 task 物件,發現它的狀態就變成了 finished,同時還可以看到其 result 變成了 1,也就是我們定義的 execute 方法的返回結果。

另外定義 task 物件還有一種方式,就是直接通過 asyncioensure_future 方法,返回結果也是 task 物件,這樣的話我們就可以不借助於 loop 來定義,即使我們還沒有宣告 loop 也可以提前定義好 task 物件,寫法如下:

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(1)
task = asyncio.ensure_future(coroutine)
print('Task:', task)  # Task: <Task pending coro=<execute() running at C:/。。。:50>>
loop = asyncio.get_event_loop()
loop.run_until_complete(task)  # Number: 1
print('Task:', task)  # Task: <Task finished coro=<execute() done, defined at C:/。。。:50> result=1>

發現其執行效果都是一樣的。

繫結回撥

另外我們也可以為某個 task 繫結一個回撥方法,比如我們來看下面的例子:

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

def callback(_task):
    print('Task Result:', _task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)

task.add_done_callback(callback)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)  # Task Result: <Response [200]>

在這裡我們定義了一個 request 方法,請求了百度,獲取其響應狀態,但是這個方法裡面我們沒有任何 print 語句。隨後我們定義了一個 callback 方法,這個方法接收一個引數,是 task 物件,然後呼叫 print 方法列印
task 物件的結果。這樣我們就定義好了一個 coroutine 物件和一個回撥方法,我們現在希望的效果是,當 coroutine 物件執行完畢之後,就去執行宣告的 callback 方法。
那麼它們二者怎樣關聯起來呢?很簡單,只需要呼叫 add_done_callback 方法即可,我們將 callback 方法傳遞給了封裝好的 task 物件,這樣當 task 執行完畢之後就可以呼叫 callback 方法了,同時 task 物件還會作為引數傳遞給 callback 方法,呼叫 task 物件的 result 方法就可以獲取返回結果了。

實際上不用回撥方法,直接在 task 執行完畢之後也可以直接呼叫 result 方法獲取結果,如下所示:

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

coroutine = request()
task = asyncio.ensure_future(coroutine)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task Result:', task.result())  # Task Result: <Response [200]>

執行結果是一樣的。

多工協程

上面的例子我們只執行了一次請求,如果我們想執行多次請求應該怎麼辦呢?我們可以定義一個 task 列表,然後使用 asynciowait 方法即可執行,看下面的例子:

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())

這裡我們使用一個 for 迴圈建立了五個 task,組成了一個列表,然後把這個列表首先傳遞給了 asynciowait 方法,然後再將其註冊到時間迴圈中,就可以發起五個任務了。最後我們再將任務的執行結果輸出出來,執行結果如下:

Tasks: [<Task pending . . . >>, . . . , <Task pending . . . >>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

可以看到五個任務被順次執行了,並得到了執行結果。

三、協程實現

前面講了這麼多,又是 async,又是 coroutine,又是 task,又是 callback,但似乎並沒有看出協程的優勢啊?反而寫法上更加奇怪和麻煩了,別急,上面的案例只是為後面的使用作鋪墊,接下來我們正式來看下協程在解決 IO 密集型任務上有怎樣的優勢吧!

上面的程式碼中,我們用一個網路請求作為示例,這就是一個耗時等待的操作,因為我們請求網頁之後需要等待頁面響應並返回結果。耗時等待的操作一般都是 IO 操作,比如檔案讀取、網路請求等等。協程對於處理這種操作是有很大優勢的,當遇到需要等待的情況的時候,程式可以暫時掛起,轉而去執行其他的操作,從而避免一直等待一個程式而耗費過多的時間,充分利用資源。

為了表現出協程的優勢,我們拿 https://static4.scrape.cuiqingcai.com/ 這個網站為例來進行演示,因為該網站響應比較慢,所以我們可以通過爬取時間來直觀地感受到爬取速度的提升。

為了讓你更好地理解協程的正確使用方法,這裡我們先來看看使用協程時常犯的錯誤,後面再給出正確的例子來對比一下。

首先,我們還是拿之前的 requests 來進行網頁請求,接下來我們再重新使用上面的方法請求一遍:

import asyncio
import requests
import time

start = time.time()

async def request():
    url = 'https://static4.scrape.cuiqingcai.com/'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

在這裡我們還是建立了 10 個 task,然後將 task 列表傳給 wait 方法並註冊到時間迴圈中執行。

執行結果如下:

Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
. . .
Cost time: 49.008238554000854

可以發現和正常的請求並沒有什麼兩樣,依然還是順次執行的,耗時 49 秒,平均一個請求耗時 5 秒,說好的非同步處理呢?

其實,要實現非同步處理,我們得先要有掛起的操作,當一個任務需要等待 IO 結果的時候,可以掛起當前任務,轉而去執行其他任務,這樣我們才能充分利用好資源,上面方法都是一本正經的序列走下來,連個掛起都沒有,怎麼可能實現非同步?想太多了。

要實現非同步,接下來我們需要了解一下 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權。當協程執行的時候遇到 await,事件迴圈就會將本協程掛起,轉而去執行別的協程,直到其他的協程掛起或執行完畢。

所以,我們可能會將程式碼中的 request 方法改成如下的樣子:

async def request():
    url = 'https://static4.scrape.cuiqingcai.com/'
    print('Waiting for', url)
    response = await requests.get(url)
    print('Get response from', url, 'response', response)

僅僅是在 requests 前面加了一個 await,然而執行以下程式碼,會得到如下報錯:

Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at C:/Users/Administrator/Desktop/spider/16.async_theory.py:128> exception=TypeError("object Response can't be used in 'await' expression")>
Traceback (most recent call last):
  File "C:/Users/Administrator/Desktop/spider/16.async_theory.py", line 131, in request
    response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression

這次它遇到 await 方法確實掛起了,也等待了,但是最後卻報了這麼個錯,這個錯誤的意思是 requests 返回的 Response 物件不能和 await 一起使用,為什麼呢?因為根據官方文件說明,await 後面的物件必須是如下格式之一:

  • 一個原生 coroutine 物件。
  • 一個由 types.coroutine 修飾的生成器,這個生成器可以返回 coroutine 物件。
  • 一個包含 await 方法的物件返回的一個迭代器。

參考:https://www.python.org/dev/peps/pep-0492/#await-expression。

requests 返回的 Response 不符合上面任一條件,因此就會報上面的錯誤了。

那麼你可能會發現,既然 await 後面可以跟一個 coroutine 物件,那麼我用 async 把請求的方法改成 coroutine 物件不就可以了嗎?所以就改寫成如下的樣子:

import asyncio
import requests
import time

start = time.time()

async def get(url):
    return requests.get(url)

async def request():
    url = 'https://static4.scrape.cuiqingcai.com/'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

這裡我們將請求頁面的方法獨立出來,並用 async 修飾,這樣就得到了一個 coroutine 物件,我們執行一下看看:

Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
. . .
Cost time: 53.78607153892517

還是不行,它還不是非同步執行,也就是說我們僅僅將涉及 IO 操作的程式碼封裝到 async 修飾的方法裡面是不可行的!我們必須要使用支援非同步操作的請求方式才可以實現真正的非同步,所以這裡就需要 aiohttp 派上用場了。

四、使用 aiohttp

aiohttp 是一個支援非同步請求的庫,利用它和 asyncio 配合我們可以非常方便地實現非同步請求操作。

安裝方式:pip3 install aiohttp

官方文件連結為:https://aiohttp.readthedocs.io/,它分為兩部分,一部分是 Client,一部分是 Server,詳細的內容可以參考官方文件。

下面我們將 aiohttp 用上來,將程式碼改成如下樣子:

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await response.text()
    await session.close()
    return response

async def request():
    url = 'https://static4.scrape.cuiqingcai.com/'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

在這裡我們將請求庫由 requests 改成了 aiohttp,通過 aiohttpClientSession 類的 get 方法進行請求,結果如下:

Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy('Server': 'nginx/1.17.8', 'Date': 'Wed, 14 Oct 2020 15:52:30 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Vary': 'Accept-Encoding', 'X-Frame-Options': 'DENY', 'X-Content-Type-Options': 'nosniff', 'Referrer-Policy': 'same-origin', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains', 'Content-Encoding': 'gzip')>
. . .
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy('Server': 'nginx/1.17.8', 'Date': 'Wed, 14 Oct 2020 15:52:31 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Vary': 'Accept-Encoding', 'X-Frame-Options': 'DENY', 'X-Content-Type-Options': 'nosniff', 'Referrer-Policy': 'same-origin', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains', 'Content-Encoding': 'gzip')>
Cost time: 5.596387624740601

成功了!我們發現這次請求的耗時由 50 秒變直接成了 5 秒多,耗費時間減少了非常非常多。

程式碼裡面我們使用了 await,後面跟了 get 方法,在執行這 10 個協程的時候,如果遇到了 await,那麼就會將當前協程掛起,轉而去執行其他的協程,直到其他的協程也掛起或執行完畢,再進行下一個協程的執行。

開始執行時,時間迴圈會執行第一個 task,針對第一個 task 來說,當執行到第一個 await 跟著的 get 方法時,它被掛起,但這個 get 方法第一步的執行是非阻塞的,掛起之後立馬被喚醒,所以立即又進入執行,建立了 ClientSession 物件,接著遇到了第二個 await,呼叫了 session.get 請求方法,然後就被掛起了,由於請求需要耗時很久,所以一直沒有被喚醒。

當第一個 task 被掛起了,那接下來該怎麼辦呢?事件迴圈會尋找當前未被掛起的協程繼續執行,於是就轉而執行第二個 task 了,也是一樣的流程操作,直到執行了第十個 tasksession.get 方法之後,全部的 task 都被掛起了。所有 task 都已經處於掛起狀態,怎麼辦?只好等待了。5 秒之後,幾個請求幾乎同時都有了響應,然後幾個 task 也被喚醒接著執行,輸出請求結果,最後總耗時,5.6 秒!

怎麼樣?這就是非同步操作的便捷之處,當遇到阻塞式操作時,任務被掛起,程式接著去執行其他的任務,而不是傻傻地等待,這樣可以充分利用 CPU 時間,而不必把時間浪費在等待 IO 上。

你可能會說,既然這樣的話,在上面的例子中,在發出網路請求後,既然接下來的 5 秒都是在等待的,在 5 秒之內,CPU 可以處理的 task 數量遠不止這些,那麼豈不是我們放 10 個、20 個、50 個、100 個、1000 個 task 一起執行,最後得到所有結果的耗時不都是差不多的嗎?因為這幾個任務被掛起後都是一起等待的。

理論來說確實是這樣的,不過有個前提,那就是伺服器在同一時刻接受無限次請求都能保證正常返回結果,也就是伺服器無限抗壓,另外還要忽略 IO 傳輸時延,確實可以做到無限 task 一起執行且在預想時間內得到結果。但由於不同伺服器處理的實現機制不同,可能某些伺服器並不能承受這麼高的併發,因此響應速度也會減慢。

在這裡我們以百度為例,來測試下併發數量為 1、3、5、10、...、500 的情況下的耗時情況,程式碼如下:

import asyncio
import aiohttp
import time

def test(num):
    start = time.time()

    async def get(url):
        session = aiohttp.ClientSession()
        response = await session.get(url)
        await response.text()
        await session.close()
        return response

    async def request():
        url = 'https://www.baidu.com/'
        await get(url)

    tasks = [asyncio.ensure_future(request()) for _ in range(num)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

    end = time.time()
    print('Number:', num, 'Cost time:', end - start)

for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
    test(number)

執行結果如下:

Number: 1 Cost time: 0.05885505676269531
Number: 3 Cost time: 0.05773782730102539
Number: 5 Cost time: 0.05768704414367676
Number: 10 Cost time: 0.15174412727355957
Number: 15 Cost time: 0.09603095054626465
Number: 30 Cost time: 0.17843103408813477
Number: 50 Cost time: 0.3741800785064697
Number: 75 Cost time: 0.2894289493560791
Number: 100 Cost time: 0.6185381412506104
Number: 200 Cost time: 1.0894129276275635
Number: 500 Cost time: 1.8213098049163818

可以看到,即使我們增加了併發數量,但在伺服器能承受高併發的前提下,其爬取速度幾乎不太受影響。

綜上所述,使用了非同步請求之後,我們幾乎可以在相同的時間內實現成百上千倍次的網路請求,把這個運用在爬蟲中,速度提升是非常可觀的。

以上便是 Python 中協程的基本原理和用法。