Python協程 & 非同步程式設計(asyncio) 入門介紹
本文首發於:行者AI
在近期的編碼工作過程中遇到了async和await裝飾的函式,查詢資料後瞭解到這種函式是基於協程的非同步函式。這類程式設計方式稱為非同步程式設計,常用在IO較頻繁的系統中,如:Tornado web框架、檔案下載、網路爬蟲等應用。協程能夠在IO等待時間就去切換執行其他任務,當IO操作結束後再自動回撥,那麼就會大大節省資源並提供效能。接下來便簡單的講解一下非同步程式設計相關概念以及案例演示。
1. 協程簡介
1.1 協程的含義及實現方法
協程(Coroutine),也可以被稱為微執行緒,是一種使用者態內的上下文切換技術。簡而言之,其實就是通過一個執行緒實現程式碼塊相互切換執行。例如:
def func1():
print(1)
... # 協程介入
print(2)
def func2():
print(3)
... # 協程介入
print(4)
func1()
func2()
上述程式碼是普通的函式定義和執行,按流程分別執行兩個函式中的程式碼,並先後會輸出:1、2、3、4
。但如果介入協程技術那麼就可以實現函式見程式碼切換執行,最終輸入:1、3、2、4
。
在Python中有多種方式可以實現協程,例如:
-
greenlet,是一個第三方模組,用於實現協程程式碼(Gevent協程就是基於greenlet實現);
-
yield,生成器,藉助生成器的特點也可以實現協程程式碼;
-
asyncio,在Python3.4中引入的模組用於編寫協程程式碼;
-
async & awiat,在Python3.5中引入的兩個關鍵字,結合asyncio模組可以更方便的編寫協程程式碼。
前兩種實現方式較為老舊,所以重點關注後面的方式
標準庫實現方法
asyncio是Python 3.4版本引入的標準庫,直接內建了對非同步IO的支援。
import asyncio @asyncio.coroutine def func1(): print(1) yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務 print(2) @asyncio.coroutine def func2(): print(3) yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務 print(4) tasks = [ asyncio.ensure_future( func1() ), asyncio.ensure_future( func2() ) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
關鍵字實現方法
async & await
關鍵字在Python3.5版本中正式引入,代替了asyncio.coroutine
裝飾器,基於他編寫的協程程式碼其實就是上一示例的加強版,讓程式碼可以更加簡便可讀。
import asyncio
async def func1():
print(1)
await asyncio.sleep(2) # 耗時操作
print(2)
async def func2():
print(3)
await asyncio.sleep(2) # 耗時操作
print(4)
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1.2 案例演示
例如:用程式碼實現下載 url_list
中的圖片。
- 方式一:同步程式設計實現
# requests庫僅支援同步的http網路請求
import requests
def download_image(url):
print("開始下載:",url)
# 傳送網路請求,下載圖片
response = requests.get(url)
# 圖片儲存到本地檔案
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(response.content)
print("下載完成")
if __name__ == '__main__':
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
for item in url_list:
download_image(item)
輸出:按順序傳送請求,請求一次下載一張圖片,假如每次下載花費1s,完成任務需要3s 以上。
- 方式二:基於協程的程實現
# aiohttp 為支援非同步程式設計的http請求庫
import aiohttp
import asyncio
async def fetch(session, url):
print("傳送請求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
輸出:一次傳送三個下載請求,同時下載,假如每次下載花費1s,完成任務僅需要1s 左右,第一種方法的耗時為第二種的三倍。
1.3 小結
協程可以讓原來要使用非同步+回撥方式寫的非人類程式碼,用看似同步的方式寫出來。
2. 非同步程式設計簡介
2.1 同步和非同步的區別
同步 :循序漸進執行操作、請求
非同步:無需等待上一步操作、請求完成,就開始下一步(每個操作仍然有先後順序)
目前python非同步相關的主流技術是通過包含關鍵字async&await的async模組實現。
2.2 非同步程式設計-事件迴圈
事件迴圈,可以把他當做是一個while迴圈,這個while迴圈在週期性的執行並執行一些任務,在特定條件下終止迴圈。
# 虛擬碼
任務列表 = [ 任務1, 任務2, 任務3,... ]
while True:
可執行的任務列表,已完成的任務列表 = 去任務列表中檢查所有的任務,將'可執行'和'已完成'的任務返回
for 就緒任務 in 已準備就緒的任務列表:
執行已就緒的任務
for 已完成的任務 in 已完成的任務列表:
在任務列表中移除 已完成的任
如果 任務列表 中的任務都已完成,則終止迴圈
在編寫程式時候可以通過如下程式碼來獲取和建立事件迴圈。
# 方式一:
import asyncio
# 生成或獲取一個事件迴圈
loop = asyncio.get_event_loop()
# 將任務新增到事件迴圈中
loop.run_until_complete(任務)
# 方式二(python3.7及以上版本支援):
asyncio.run( 任務 )
2.3 非同步程式設計-快速上手
async 關鍵字
- 協程函式:定義函式時候由async關鍵字裝飾的函式
async def 函式名
- 協程物件:執行協程函式()得到的協程物件。
# 協程函式
async def func():
pass
# 協程物件
result = func()
注意:執行協程函式只會建立協程物件,函式內部程式碼不會執行。如果想要執行協程函式內部程式碼,必須要將協程物件交給事件迴圈來處理。
import asyncio
async def func():
print("執行協程函式內部程式碼!")
result = func()
# 呼叫方法1:
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )
# 呼叫方法2:
asyncio.run( result )
await 關鍵字
await + 可等待的物件(協程物件、Future、Task物件 -> IO等待),遇到IO操作掛起當前協程(任務),等IO操作完成之後再繼續往下執行。當前協程掛起時,事件迴圈可以去執行其他協程(任務)。
import asyncio
async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print("執行協程函式內部程式碼")
# await等待物件的值得到結果之後再繼續向下走
response = await others()
print("IO請求結束,結果為:", response)
asyncio.run( func() )
Task 物件
Task物件的作用是在事件迴圈中新增多個任務,用於併發排程協程,通過asyncio.create_task(協程物件)
的方式建立Task物件,這樣可以讓協程加入事件迴圈中等待被排程執行。
async def module_a():
print("start module_a")
await asyncio.sleep(2) # 模擬 module_a 的io操作
print('end module_a')
return 'module_a 完成'
async def module_b():
print("start module_b")
await asyncio.sleep(1) # 模擬 module_a 的io操作
print('end module_b')
return 'module_b 完成'
task_list = [
module_a(),
module_b(),
]
done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)
2.4 案例演示
例如:用程式碼實現連線並查詢資料庫的同時,下載一個APK檔案到本地。
import asyncio
import aiomysql
import os
import aiofiles as aiofiles
from aiohttp import ClientSession
async def get_app():
url = "http://www.123.apk"
async with ClientSession() as session:
# 網路IO請求,獲取響應
async with session.get(url)as res:
if res.status == 200:
print("下載成功", res)
# 磁碟IO請求,讀取響應資料
apk = await res.content.read()
async with aiofiles.open("demo2.apk", "wb") as f:
# 磁碟IO請求,資料寫入本地磁碟
await f.write(apk)
else:
print("下載失敗")
async def excute_sql(sql):
# 網路IO操作:連線MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
# 網路IO操作:建立CURSOR
cur = await conn.cursor()
# 網路IO操作:執行SQL
await cur.execute(sql)
# 網路IO操作:獲取SQL結果
result = await cur.fetchall()
print(result)
# 網路IO操作:關閉連結
await cur.close()
conn.close()
task_list = [get_app(), execute_sql(sql="SELECT Host,User FROM user")]
asyncio.run(asyncio.wait(task_list))
程式碼邏輯分析:
【step1】asyncio.run()
建立了事件迴圈。wait()
方法將task任務列表加入到當前的事件迴圈中;(注意:必須先建立事件迴圈,後加入任務列表,否則會報錯)
【step2】事件迴圈監聽事件狀態,開始執行程式碼,先執行列表中的get_app()
方法,當代碼執行到async with session.get(url)as res:
時,遇到await關鍵字表示有IO耗時操作,執行緒會將該任務掛起在後臺執行,並切換到另外一個非同步函式excute_sql()
;
【step3】當代碼執行到excute_sql()
的第一個IO耗時操作後,執行緒會重複先前的操作,將該任務掛起,去執行其他可執行程式碼。假如此時事件迴圈監聽到get_app()
中的第一IO耗時操作已經執行完成,那麼執行緒會切換到該方法第一個IO操作後的程式碼,並按順序執行直到遇上下一個await裝飾的IO操作;假如事件迴圈監聽到excute_sql()
中的第一個IO操作先於get_app()
的第一個IO操作完成,那麼執行緒會繼續執行excute_sql
的後續程式碼;
【step4】執行緒會重複進行上述第3點中的步驟,直到程式碼全部執行完成,事件迴圈也會隨之停止。
2.5 小節
一般來說CPU的耗時運算方式有:
計算密集型的操作:計算密集型任務的特點是要進行大量的計算、邏輯判斷,消耗CPU資源,比如計算圓周率、對視訊進行高清解碼等等。
IO密集型的操作:涉及到網路、磁碟IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和記憶體的速度)。
非同步程式設計基於協程實現,如果利用協程實現計算密集型操作,因為執行緒在上下文之間的來回切換總會經歷類似於”計算“-->”儲存“-->”建立新環境“ 的一系列操作,導致系統的整體效能反而會下降。所以非同步程式設計並不適用於計算密集型的程式。然而在IO密集型操作彙總,協程在IO等待時間就去切換執行其他任務,當IO操作結束後再自動回撥,那麼就會大大節省資源並提供效能。
PS:更多技術乾貨,快關注【公眾號 | xingzhe_ai】,與行者一起討論吧!