1. 程式人生 > 其它 >Python協程 & 非同步程式設計(asyncio) 入門介紹

Python協程 & 非同步程式設計(asyncio) 入門介紹

本文首發於:行者AI

在近期的編碼工作過程中遇到了async和await裝飾的函式,查詢資料後瞭解到這種函式是基於協程的非同步函式。這類程式設計方式稱為非同步程式設計,常用在IO較頻繁的系統中,如:Tornado web框架、檔案下載、網路爬蟲等應用。協程能夠在IO等待時間就去切換執行其他任務,當IO操作結束後再自動回撥,那麼就會大大節省資源並提供效能。接下來便簡單的講解一下非同步程式設計相關概念以及案例演示。

1. 協程簡介

1.1 協程的含義及實現方法

協程(Coroutine),也可以被稱為微執行緒,是一種使用者態內的上下文切換技術。簡而言之,其實就是通過一個執行緒實現程式碼塊相互切換執行。例如:

def func1():
	print(1)
    ...   # 協程介入
	print(2)
	
def func2():
	print(3)
    ...   # 協程介入
	print(4)

func1()
func2()

上述程式碼是普通的函式定義和執行,按流程分別執行兩個函式中的程式碼,並先後會輸出:1、2、3、4。但如果介入協程技術那麼就可以實現函式見程式碼切換執行,最終輸入:1、3、2、4

在Python中有多種方式可以實現協程,例如:

  • greenlet,是一個第三方模組,用於實現協程程式碼(Gevent協程就是基於greenlet實現);

  • yield,生成器,藉助生成器的特點也可以實現協程程式碼;

  • asyncio,在Python3.4中引入的模組用於編寫協程程式碼;

  • async & awiat,在Python3.5中引入的兩個關鍵字,結合asyncio模組可以更方便的編寫協程程式碼。

前兩種實現方式較為老舊,所以重點關注後面的方式

標準庫實現方法

asyncio是Python 3.4版本引入的標準庫,直接內建了對非同步IO的支援。

import asyncio

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(2)  # 遇到IO耗時操作,自動化切換到tasks中的其他任務
    print(2)

@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務
    print(4)

tasks = [
    asyncio.ensure_future( func1() ),
    asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

關鍵字實現方法

async & await 關鍵字在Python3.5版本中正式引入,代替了asyncio.coroutine 裝飾器,基於他編寫的協程程式碼其實就是上一示例的加強版,讓程式碼可以更加簡便可讀。


import asyncio

async def func1():
    print(1)
    await asyncio.sleep(2)  # 耗時操作  
    print(2)

async def func2():
    print(3)
    await asyncio.sleep(2)   # 耗時操作
    print(4)

tasks = [
    asyncio.ensure_future(func1()),
    asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

1.2 案例演示

例如:用程式碼實現下載 url_list 中的圖片。

  • 方式一:同步程式設計實現
# requests庫僅支援同步的http網路請求
import requests

def download_image(url):
  print("開始下載:",url)
  # 傳送網路請求,下載圖片
  response = requests.get(url)
  # 圖片儲存到本地檔案
  file_name = url.rsplit('_')[-1]
  with open(file_name, mode='wb') as file_object:
      file_object.write(response.content)
print("下載完成")


if __name__ == '__main__':
  url_list = [
      'https://www.1.jpg',
      'https://www.2.jpg',
      'https://www.3.jpg'
  ]
  for item in url_list:
      download_image(item)

輸出:按順序傳送請求,請求一次下載一張圖片,假如每次下載花費1s,完成任務需要3s 以上。

  • 方式二:基於協程的程實現
# aiohttp 為支援非同步程式設計的http請求庫
import aiohttp
import asyncio

async def fetch(session, url):
  print("傳送請求:", url)
  async with session.get(url, verify_ssl=False) as response:
      content = await response.content.read()
      file_name = url.rsplit('_')[-1]
      with open(file_name, mode='wb') as file_object:
          file_object.write(content)

async def main():
  async with aiohttp.ClientSession() as session:
      url_list = [
          'https://www.1.jpg',
          'https://www.2.jpg',
          'https://www.3.jpg'
      ]
      tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
      await asyncio.wait(tasks)


if __name__ == '__main__':
  asyncio.run(main())

輸出:一次傳送三個下載請求,同時下載,假如每次下載花費1s,完成任務僅需要1s 左右,第一種方法的耗時為第二種的三倍。

1.3 小結

協程可以讓原來要使用非同步+回撥方式寫的非人類程式碼,用看似同步的方式寫出來。

2. 非同步程式設計簡介

2.1 同步和非同步的區別

同步 :循序漸進執行操作、請求
非同步:無需等待上一步操作、請求完成,就開始下一步(每個操作仍然有先後順序)


目前python非同步相關的主流技術是通過包含關鍵字async&await的async模組實現。

2.2 非同步程式設計-事件迴圈

事件迴圈,可以把他當做是一個while迴圈,這個while迴圈在週期性的執行並執行一些任務,在特定條件下終止迴圈。

# 虛擬碼
任務列表 = [ 任務1, 任務2, 任務3,... ]
while True:
    可執行的任務列表,已完成的任務列表 = 去任務列表中檢查所有的任務,將'可執行'和'已完成'的任務返回
    for 就緒任務 in 已準備就緒的任務列表:
        執行已就緒的任務
        
    for 已完成的任務 in 已完成的任務列表:
        在任務列表中移除 已完成的任
        
	如果 任務列表 中的任務都已完成,則終止迴圈

在編寫程式時候可以通過如下程式碼來獲取和建立事件迴圈。

# 方式一:
import asyncio
# 生成或獲取一個事件迴圈
loop = asyncio.get_event_loop()
# 將任務新增到事件迴圈中
loop.run_until_complete(任務)

# 方式二(python3.7及以上版本支援):
asyncio.run( 任務 )

2.3 非同步程式設計-快速上手

async 關鍵字

  • 協程函式:定義函式時候由async關鍵字裝飾的函式 async def 函式名
  • 協程物件:執行協程函式()得到的協程物件。
# 協程函式
async def func():
    pass
# 協程物件
result = func()

注意:執行協程函式只會建立協程物件,函式內部程式碼不會執行。如果想要執行協程函式內部程式碼,必須要將協程物件交給事件迴圈來處理。

import asyncio 
async def func():
    print("執行協程函式內部程式碼!")
result = func()

# 呼叫方法1:
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )

# 呼叫方法2:
asyncio.run( result )

await 關鍵字

await + 可等待的物件(協程物件、Future、Task物件 -> IO等待),遇到IO操作掛起當前協程(任務),等IO操作完成之後再繼續往下執行。當前協程掛起時,事件迴圈可以去執行其他協程(任務)。

import asyncio

async def others():
    print("start")
    await asyncio.sleep(2)
    print('end')
    return '返回值'

async def func():
    print("執行協程函式內部程式碼")
    # await等待物件的值得到結果之後再繼續向下走
    response = await others()
    print("IO請求結束,結果為:", response)

asyncio.run( func() )

Task 物件

Task物件的作用是在事件迴圈中新增多個任務,用於併發排程協程,通過asyncio.create_task(協程物件)的方式建立Task物件,這樣可以讓協程加入事件迴圈中等待被排程執行。

async def module_a():
    print("start module_a")
    await asyncio.sleep(2) # 模擬 module_a 的io操作
    print('end module_a')
    return 'module_a 完成'

async def module_b():
    print("start module_b")
    await asyncio.sleep(1) # 模擬 module_a 的io操作
    print('end module_b')
    return 'module_b 完成'  

task_list = [
    module_a(),
	module_b(), 
]

done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)

2.4 案例演示

例如:用程式碼實現連線並查詢資料庫的同時,下載一個APK檔案到本地。

import asyncio
import aiomysql
import os
import aiofiles as aiofiles
from aiohttp import ClientSession

async def get_app():

    url = "http://www.123.apk"
    async with ClientSession() as session:
        # 網路IO請求,獲取響應
        async with session.get(url)as res:
            if res.status == 200:
                print("下載成功", res)
                # 磁碟IO請求,讀取響應資料
                apk = await res.content.read()
                async  with  aiofiles.open("demo2.apk", "wb") as f:
                    # 磁碟IO請求,資料寫入本地磁碟
                    await f.write(apk)
            else:
                print("下載失敗")

async def excute_sql(sql):
    # 網路IO操作:連線MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )
    # 網路IO操作:建立CURSOR
    cur = await conn.cursor()
    # 網路IO操作:執行SQL
    await cur.execute(sql)
    # 網路IO操作:獲取SQL結果
    result = await cur.fetchall()
    print(result)
    # 網路IO操作:關閉連結
    await cur.close()
    conn.close()

task_list = [get_app(), execute_sql(sql="SELECT Host,User FROM user")]
asyncio.run(asyncio.wait(task_list))

程式碼邏輯分析:

【step1】asyncio.run()建立了事件迴圈。wait()方法將task任務列表加入到當前的事件迴圈中;(注意:必須先建立事件迴圈,後加入任務列表,否則會報錯)

【step2】事件迴圈監聽事件狀態,開始執行程式碼,先執行列表中的get_app()方法,當代碼執行到async with session.get(url)as res:時,遇到await關鍵字表示有IO耗時操作,執行緒會將該任務掛起在後臺執行,並切換到另外一個非同步函式excute_sql()

【step3】當代碼執行到excute_sql()的第一個IO耗時操作後,執行緒會重複先前的操作,將該任務掛起,去執行其他可執行程式碼。假如此時事件迴圈監聽到get_app()中的第一IO耗時操作已經執行完成,那麼執行緒會切換到該方法第一個IO操作後的程式碼,並按順序執行直到遇上下一個await裝飾的IO操作;假如事件迴圈監聽到excute_sql()中的第一個IO操作先於get_app()的第一個IO操作完成,那麼執行緒會繼續執行excute_sql的後續程式碼;

【step4】執行緒會重複進行上述第3點中的步驟,直到程式碼全部執行完成,事件迴圈也會隨之停止。

2.5 小節

一般來說CPU的耗時運算方式有:

計算密集型的操作:計算密集型任務的特點是要進行大量的計算、邏輯判斷,消耗CPU資源,比如計算圓周率、對視訊進行高清解碼等等。

IO密集型的操作:涉及到網路、磁碟IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和記憶體的速度)。

非同步程式設計基於協程實現,如果利用協程實現計算密集型操作,因為執行緒在上下文之間的來回切換總會經歷類似於”計算“-->”儲存“-->”建立新環境“ 的一系列操作,導致系統的整體效能反而會下降。所以非同步程式設計並不適用於計算密集型的程式。然而在IO密集型操作彙總,協程在IO等待時間就去切換執行其他任務,當IO操作結束後再自動回撥,那麼就會大大節省資源並提供效能。


PS:更多技術乾貨,快關注【公眾號 | xingzhe_ai】,與行者一起討論吧!