【Python】協程,看了就會
一、基本介紹
從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程還是以生成器物件為基礎的,在 Python 3.5 則增加了 async/await,使得協程的實現更加方便。
Python 中使用協程最常用的庫莫過於 asyncio,所以本文會以 asyncio 為基礎來介紹協程的使用。
首先我們需要了解下面幾個概念。
- event_loop:事件迴圈,相當於一個無限迴圈,我們可以把一些函式註冊到這個事件迴圈上,當滿足條件發生的時候,就會呼叫對應的處理方法。
- coroutine:中文翻譯叫協程,在 Python 中常指代為協程物件型別,我們可以將協程物件註冊到時間迴圈中,它會被事件迴圈呼叫。我們可以使用 async 關鍵字來定義一個方法,這個方法在呼叫時不會立即
被執行,而是返回一個協程物件。 - task:任務,它是對協程物件的進一步封裝,包含了任務的各個狀態。
- future:代表將來執行或沒有執行的任務的結果,實際上和 task 沒有本質區別。
二、攜程用法
定義攜程
首先我們來定義一個協程,體驗一下它和普通程序在實現上的不同之處,程式碼如下:
import asyncio async def execute(x): print('Number:', x) coroutine1 = execute(1) print('Coroutine:', coroutine) # Coroutine: <coroutine object execute at 0x000001ECF6762DC8> loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) # Number: 1
首先我們引入了 asyncio
這個包,這樣我們才可以使用 async
和 await
,然後我們使用 async
定義了一個 execute 方法,方法接收一個數字引數,方法執行之後會列印這個數字。
隨後我們直接呼叫了這個方法,然而這個方法並沒有執行,而是返回了一個 coroutine 協程物件。隨後我們使用 get_event_loop
方法建立了一個事件迴圈 loop
,並呼叫了 loop
物件的 run_until_complete
方法將協程注
冊到事件迴圈 loop
中,然後啟動。最後我們才看到了 execute 方法列印了輸出結果。
可見,async
定義的方法就會變成一個無法直接執行的 coroutine
上面我們還提到了 task
,它是對 coroutine
物件的進一步封裝,它裡面相比 coroutine
物件多了執行狀態,比如 running
、finished
等,我們可以用這些狀態來獲取協程物件的執行情況。
在上面的例子中,當我們將 coroutine
物件傳遞給 run_until_complete
方法的時候,實際上它進行了一個操作就是將 coroutine
封裝成了 task
物件,我們也可以顯式地進行宣告,如下所示:
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task) # Task: <Task pending coro=<execute() running at C:/。。。:27>>
loop.run_until_complete(task) # Number: 1
print('Task:', task) # Task: <Task finished coro=<execute2() done, defined at C:/。。。:27> result=1>
這裡我們定義了 loop
物件之後,接著呼叫了它的 create_task
方法將 coroutine
物件轉化為了 task
物件,隨後我們列印輸出一下,發現它是 pending
狀態。接著我們將 task
物件新增到事件迴圈中得到執行,隨後我們再列印輸出一下 task
物件,發現它的狀態就變成了 finished
,同時還可以看到其 result
變成了 1
,也就是我們定義的 execute
方法的返回結果。
另外定義 task
物件還有一種方式,就是直接通過 asyncio
的 ensure_future
方法,返回結果也是 task
物件,這樣的話我們就可以不借助於 loop
來定義,即使我們還沒有宣告 loop
也可以提前定義好 task
物件,寫法如下:
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
task = asyncio.ensure_future(coroutine)
print('Task:', task) # Task: <Task pending coro=<execute() running at C:/。。。:50>>
loop = asyncio.get_event_loop()
loop.run_until_complete(task) # Number: 1
print('Task:', task) # Task: <Task finished coro=<execute() done, defined at C:/。。。:50> result=1>
發現其執行效果都是一樣的。
繫結回撥
另外我們也可以為某個 task 繫結一個回撥方法,比如我們來看下面的例子:
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(_task):
print('Task Result:', _task.result())
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop = asyncio.get_event_loop()
loop.run_until_complete(task) # Task Result: <Response [200]>
在這裡我們定義了一個 request
方法,請求了百度,獲取其響應狀態,但是這個方法裡面我們沒有任何 print
語句。隨後我們定義了一個 callback
方法,這個方法接收一個引數,是 task
物件,然後呼叫 print
方法列印
了 task
物件的結果。這樣我們就定義好了一個 coroutine
物件和一個回撥方法,我們現在希望的效果是,當 coroutine
物件執行完畢之後,就去執行宣告的 callback
方法。
那麼它們二者怎樣關聯起來呢?很簡單,只需要呼叫 add_done_callback
方法即可,我們將 callback
方法傳遞給了封裝好的 task
物件,這樣當 task
執行完畢之後就可以呼叫 callback
方法了,同時 task
物件還會作為引數傳遞給 callback
方法,呼叫 task
物件的 result
方法就可以獲取返回結果了。
實際上不用回撥方法,直接在 task 執行完畢之後也可以直接呼叫 result
方法獲取結果,如下所示:
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task Result:', task.result()) # Task Result: <Response [200]>
執行結果是一樣的。
多工協程
上面的例子我們只執行了一次請求,如果我們想執行多次請求應該怎麼辦呢?我們可以定義一個 task
列表,然後使用 asyncio
的 wait
方法即可執行,看下面的例子:
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task Result:', task.result())
這裡我們使用一個 for 迴圈建立了五個 task
,組成了一個列表,然後把這個列表首先傳遞給了 asyncio
的 wait
方法,然後再將其註冊到時間迴圈中,就可以發起五個任務了。最後我們再將任務的執行結果輸出出來,執行結果如下:
Tasks: [<Task pending . . . >>, . . . , <Task pending . . . >>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
可以看到五個任務被順次執行了,並得到了執行結果。
三、協程實現
前面講了這麼多,又是 async
,又是 coroutine
,又是 task
,又是 callback
,但似乎並沒有看出協程的優勢啊?反而寫法上更加奇怪和麻煩了,別急,上面的案例只是為後面的使用作鋪墊,接下來我們正式來看下協程在解決 IO
密集型任務上有怎樣的優勢吧!
上面的程式碼中,我們用一個網路請求作為示例,這就是一個耗時等待
的操作,因為我們請求網頁之後需要等待頁面響應並返回結果。耗時等待
的操作一般都是 IO
操作,比如檔案讀取、網路請求等等。協程對於處理這種操作是有很大優勢的,當遇到需要等待的情況的時候,程式可以暫時掛起,轉而去執行其他的操作,從而避免一直等待一個程式而耗費過多的時間,充分利用資源。
為了表現出協程的優勢,我們拿 https://static4.scrape.cuiqingcai.com/ 這個網站為例來進行演示,因為該網站響應比較慢,所以我們可以通過爬取時間來直觀地感受到爬取速度的提升。
為了讓你更好地理解協程的正確使用方法,這裡我們先來看看使用協程時常犯的錯誤,後面再給出正確的例子來對比一下。
首先,我們還是拿之前的 requests 來進行網頁請求,接下來我們再重新使用上面的方法請求一遍:
import asyncio
import requests
import time
start = time.time()
async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = requests.get(url)
print('Get response from', url, 'response', response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
在這裡我們還是建立了 10 個 task
,然後將 task
列表傳給 wait
方法並註冊到時間迴圈中執行。
執行結果如下:
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
. . .
Cost time: 49.008238554000854
可以發現和正常的請求並沒有什麼兩樣,依然還是順次執行的,耗時 49 秒,平均一個請求耗時 5 秒,說好的非同步處理呢?
其實,要實現非同步處理,我們得先要有掛起
的操作,當一個任務需要等待 IO
結果的時候,可以掛起當前任務,轉而去執行其他任務,這樣我們才能充分利用好資源,上面方法都是一本正經的序列走下來,連個掛起都沒有,怎麼可能實現非同步?想太多了。
要實現非同步,接下來我們需要了解一下 await
的用法,使用 await
可以將耗時等待的操作掛起,讓出控制權。當協程執行的時候遇到 await
,事件迴圈就會將本協程掛起,轉而去執行別的協程,直到其他的協程掛起或執行完畢。
所以,我們可能會將程式碼中的 request
方法改成如下的樣子:
async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = await requests.get(url)
print('Get response from', url, 'response', response)
僅僅是在 requests
前面加了一個 await
,然而執行以下程式碼,會得到如下報錯:
Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at C:/Users/Administrator/Desktop/spider/16.async_theory.py:128> exception=TypeError("object Response can't be used in 'await' expression")>
Traceback (most recent call last):
File "C:/Users/Administrator/Desktop/spider/16.async_theory.py", line 131, in request
response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression
這次它遇到 await
方法確實掛起了,也等待了,但是最後卻報了這麼個錯,這個錯誤的意思是 requests
返回的 Response
物件不能和 await
一起使用,為什麼呢?因為根據官方文件說明,await
後面的物件必須是如下格式之一:
- 一個原生
coroutine
物件。 - 一個由
types.coroutine
修飾的生成器,這個生成器可以返回coroutine
物件。 - 一個包含
await
方法的物件返回的一個迭代器。
參考:https://www.python.org/dev/peps/pep-0492/#await-expression。
requests
返回的 Response
不符合上面任一條件,因此就會報上面的錯誤了。
那麼你可能會發現,既然 await
後面可以跟一個 coroutine
物件,那麼我用 async
把請求的方法改成 coroutine
物件不就可以了嗎?所以就改寫成如下的樣子:
import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'response', response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
這裡我們將請求頁面的方法獨立出來,並用 async
修飾,這樣就得到了一個 coroutine
物件,我們執行一下看看:
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
. . .
Cost time: 53.78607153892517
還是不行,它還不是非同步執行,也就是說我們僅僅將涉及 IO
操作的程式碼封裝到 async
修飾的方法裡面是不可行的!我們必須要使用支援非同步操作的請求方式
才可以實現真正的非同步,所以這裡就需要 aiohttp
派上用場了。
四、使用 aiohttp
aiohttp
是一個支援非同步請求的庫,利用它和 asyncio
配合我們可以非常方便地實現非同步請求操作。
安裝方式:pip3 install aiohttp
官方文件連結為:https://aiohttp.readthedocs.io/,它分為兩部分,一部分是 Client
,一部分是 Server
,詳細的內容可以參考官方文件。
下面我們將 aiohttp
用上來,將程式碼改成如下樣子:
import asyncio
import aiohttp
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response
async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'response', response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
在這裡我們將請求庫由 requests
改成了 aiohttp
,通過 aiohttp
的 ClientSession
類的 get
方法進行請求,結果如下:
Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy('Server': 'nginx/1.17.8', 'Date': 'Wed, 14 Oct 2020 15:52:30 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Vary': 'Accept-Encoding', 'X-Frame-Options': 'DENY', 'X-Content-Type-Options': 'nosniff', 'Referrer-Policy': 'same-origin', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains', 'Content-Encoding': 'gzip')>
. . .
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy('Server': 'nginx/1.17.8', 'Date': 'Wed, 14 Oct 2020 15:52:31 GMT', 'Content-Type': 'text/html; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'Vary': 'Accept-Encoding', 'X-Frame-Options': 'DENY', 'X-Content-Type-Options': 'nosniff', 'Referrer-Policy': 'same-origin', 'Strict-Transport-Security': 'max-age=15724800; includeSubDomains', 'Content-Encoding': 'gzip')>
Cost time: 5.596387624740601
成功了!我們發現這次請求的耗時由 50 秒變直接成了 5 秒多,耗費時間減少了非常非常多。
程式碼裡面我們使用了 await
,後面跟了 get
方法,在執行這 10 個協程的時候,如果遇到了 await
,那麼就會將當前協程掛起,轉而去執行其他的協程,直到其他的協程也掛起或執行完畢,再進行下一個協程的執行。
開始執行時,時間迴圈會執行第一個 task
,針對第一個 task
來說,當執行到第一個 await
跟著的 get
方法時,它被掛起,但這個 get
方法第一步的執行是非阻塞的,掛起之後立馬被喚醒,所以立即又進入執行,建立了 ClientSession
物件,接著遇到了第二個 await
,呼叫了 session.get
請求方法,然後就被掛起了,由於請求需要耗時很久,所以一直沒有被喚醒。
當第一個 task
被掛起了,那接下來該怎麼辦呢?事件迴圈會尋找當前未被掛起
的協程繼續執行,於是就轉而執行第二個 task
了,也是一樣的流程操作,直到執行了第十個 task
的 session.get
方法之後,全部的 task
都被掛起了。所有 task
都已經處於掛起狀態,怎麼辦?只好等待了。5 秒之後,幾個請求幾乎同時都有了響應,然後幾個 task
也被喚醒接著執行,輸出請求結果,最後總耗時,5.6 秒!
怎麼樣?這就是非同步操作的便捷之處,當遇到阻塞式操作時,任務被掛起,程式接著去執行其他的任務,而不是傻傻地等待,這樣可以充分利用 CPU 時間,而不必把時間浪費在等待 IO 上。
你可能會說,既然這樣的話,在上面的例子中,在發出網路請求後,既然接下來的 5 秒都是在等待的,在 5 秒之內,CPU 可以處理的 task
數量遠不止這些,那麼豈不是我們放 10 個、20 個、50 個、100 個、1000 個 task
一起執行,最後得到所有結果的耗時不都是差不多的嗎?因為這幾個任務被掛起後都是一起等待的。
理論來說確實是這樣的,不過有個前提,那就是伺服器在同一時刻接受無限次請求都能保證正常返回結果,也就是伺服器無限抗壓,另外還要忽略 IO 傳輸時延,確實可以做到無限 task
一起執行且在預想時間內得到結果。但由於不同伺服器處理的實現機制不同,可能某些伺服器並不能承受這麼高的併發,因此響應速度也會減慢。
在這裡我們以百度為例,來測試下併發數量為 1、3、5、10、...、500 的情況下的耗時情況,程式碼如下:
import asyncio
import aiohttp
import time
def test(num):
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response
async def request():
url = 'https://www.baidu.com/'
await get(url)
tasks = [asyncio.ensure_future(request()) for _ in range(num)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Number:', num, 'Cost time:', end - start)
for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
test(number)
執行結果如下:
Number: 1 Cost time: 0.05885505676269531
Number: 3 Cost time: 0.05773782730102539
Number: 5 Cost time: 0.05768704414367676
Number: 10 Cost time: 0.15174412727355957
Number: 15 Cost time: 0.09603095054626465
Number: 30 Cost time: 0.17843103408813477
Number: 50 Cost time: 0.3741800785064697
Number: 75 Cost time: 0.2894289493560791
Number: 100 Cost time: 0.6185381412506104
Number: 200 Cost time: 1.0894129276275635
Number: 500 Cost time: 1.8213098049163818
可以看到,即使我們增加了併發數量,但在伺服器能承受高併發的前提下,其爬取速度幾乎不太受影響。
綜上所述,使用了非同步請求之後,我們幾乎可以在相同的時間內實現成百上千倍次的網路請求,把這個運用在爬蟲中,速度提升是非常可觀的。
以上便是 Python 中協程的基本原理和用法。