1. 程式人生 > 實用技巧 >Python中協程非同步IO(asyncio)詳解

Python中協程非同步IO(asyncio)詳解

介紹

非同步IO:就是發起一個IO操作(如:網路請求,檔案讀寫等),這些操作一般是比較耗時的,不用等待它結束,可以繼續做其他事情,結束時會發來通知。

協程:又稱為微執行緒,在一個執行緒中執行,執行函式時可以隨時中斷,由程式(使用者)自身控制,執行效率極高,與多執行緒比較,沒有切換執行緒的開銷和多執行緒鎖機制。

python中非同步IO操作是通過asyncio來實現的。

為了更加詳細說明asyncio,我們先從協程的最基礎開始講解。

協程基礎

從語句上看,協程和生成器類似,都是包含了yield關鍵字,不同之處在於協程中yield關鍵詞通常出現在=右邊,可以產出值a(y = yield a)或不產出值時為None(y = yield)。呼叫方可以用send函式傳送值給協程。

啟用協程時在yield處暫停,等待呼叫方傳送資料,下次繼續在yield暫停。從根本上看,yield是流程控制的工具,可以實現協作式多工,這也是後面講解非同步IO的基礎。

最簡單的協程示例

使用協程時需要預啟用(next函式)後才能使用send傳送值。(a = yield b),next時會產出yield右邊的值b,send時接收值的是yield左邊的值a

def coroutine_example(name):
    print('start coroutine...name:', name)
    x = yield name #呼叫next()時,產出yield右邊的值後暫停;呼叫send()時,產出值賦給x,並往下執行
    print('send值:', x)

coro = coroutine_example('Zarten')
print('next的返回值:', next(coro))
print('send的返回值:', coro.send(6))

輸出結果:

必須先呼叫next()函式預啟用協程,不然send()函式無法使用。

呼叫next()時,產出yield右邊的值後暫停不再往yield的下一行執行(一般不需要next產出值),等待send的到來,呼叫send()時,產出值賦給x(可對x作進一步處理),並往下執行。

協程結束時會跟生成器一樣丟擲StopIteration的異常給呼叫方,呼叫方可以捕獲它後處理。

讓協程返回值以及yield from說明

獲取協程的返回值

當結束協程時,會返回返回值,呼叫方會丟擲StopIteration異常,返回值就在異常物件的value屬性中

def coroutine_example(name):
    print('start coroutine...name:', name)

    while True:
        x = yield name #呼叫next()時,產出yield右邊的值後暫停;呼叫send()時,產出值賦給x,並往下執行
        if x is None:
            return 'zhihuID: Zarten'
        print('send值:', x)

coro = coroutine_example('Zarten')
next(coro)
print('send的返回值:', coro.send(6))
try:
    coro.send(None)
except StopIteration as e:
    print('返回值:', e.value)

yield from 說明

yield from跟for迴圈很相似,但功能更多一些,不信你看下面程式碼

def for_test():
    for i in range(3):
        yield i
print(list(for_test()))

def yield_from_test():
    yield from range(3)
print(list(yield_from_test()))

下面是輸出結果:

其實yield from內部會自動捕獲StopIteration異常,並把異常物件的value屬性變成yield from表示式的值。

yield from x 表示式內部首先是呼叫iter(x),然後再呼叫next(),因此x是任何的可迭代物件。yield from 的主要功能就是開啟雙向通道,把最外層的呼叫方和最內層的子生成器連線起來。

下面程式碼展示:呼叫方傳送的值在yield from表示式處直接傳遞給子生成器,並在yield from處等待子生成器的返回

def coroutine_example(name):
    print('start coroutine...name:', name)
    x = yield name #呼叫next()時,產出yield右邊的值後暫停;呼叫send()時,產出值賦給x,並往下執行
    print('send值:', x)
    return 'zhihuID: Zarten'

def grouper2():
    result2 = yield from coroutine_example('Zarten') #在此處暫停,等待子生成器的返回後繼續往下執行
    print('result2的值:', result2)
    return result2

def grouper():
    result = yield from grouper2() #在此處暫停,等待子生成器的返回後繼續往下執行
    print('result的值:', result)
    return result

def main():
    g = grouper()
    next(g)
    try:
        g.send(10)
    except StopIteration as e:
        print('返回值:', e.value)

if __name__ == '__main__':
    main()

輸出結果:

從上面也可看到yield from起到一個雙向通道的作用,同時子生成器也可使用yield from呼叫另一個子生成器,一直巢狀下去直到遇到yield表示式結束鏈式。

yield from一般用於asyncio模組做非同步IO

非同步IO(asyncio)

從上面我們知道了協程的基礎,非同步IO的asyncio庫使用事件迴圈驅動的協程實現併發。使用者可主動控制程式,在認為耗時IO處新增await(yield from)。在asyncio庫中,協程使用@asyncio.coroutine裝飾,使用yield from來驅動,在python3.5中作了如下更改:

@asyncio.coroutine -> async

yield from -> await

asyncio中幾個重要概念

1.事件迴圈

管理所有的事件,在整個程式執行過程中不斷迴圈執行並追蹤事件發生的順序將它們放在佇列中,空閒時呼叫相應的事件處理者來處理這些事件。

2.Future

Future物件表示尚未完成的計算,還未完成的結果

3.Task

是Future的子類,作用是在執行某個任務的同時可以併發的執行多個任務。

asyncio.Task用於實現協作式多工的庫,且Task物件不能使用者手動例項化,通過下面2個函式建立:

asyncio.async()

loop.create_task() 或 asyncio.ensure_future()

最簡單的非同步IO示例

run_until_complete():

阻塞呼叫,直到協程執行結束才返回。引數是future,傳入協程物件時內部會自動變為future

asyncio.sleep():

模擬IO操作,這樣的休眠不會阻塞事件迴圈,前面加上await後會把控制權交給主事件迴圈,在休眠(IO操作)結束後恢復這個協程。

提示:若在協程中需要有延時操作,應該使用 await asyncio.sleep(),而不是使用time.sleep(),因為使用time.sleep()後會釋放GIL,阻塞整個主執行緒,從而阻塞整個事件迴圈。

import asyncio

async def coroutine_example():
    await asyncio.sleep(1)
    print('zhihu ID: Zarten')

coro = coroutine_example()

loop = asyncio.get_event_loop()
loop.run_until_complete(coro)
loop.close()

上面輸出:會暫停1秒,等待 asyncio.sleep(1) 返回後列印

建立Task

loop.create_task():

接收一個協程,返回一個asyncio.Task的例項,也是asyncio.Future的例項,畢竟Task是Future的子類。返回值可直接傳入run_until_complete()

返回的Task物件可以看到協程的執行情況

import asyncio

async def coroutine_example():
    await asyncio.sleep(1)
    print('zhihu ID: Zarten')

coro = coroutine_example()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('執行情況:', task)

loop.run_until_complete(task)
print('再看下執行情況:', task)
loop.close()

輸出結果:

從下圖可看到,當task為finished狀態時,有個result()的方法,我們可以通過這個方法來獲取協程的返回值

獲取協程返回值

有2種方案可以獲取返回值。

  • 第1種方案:通過task.result()

可通過呼叫task.result()方法來獲取協程的返回值,但是隻有執行完畢後才能獲取,若沒有執行完畢,result()方法不會阻塞去等待結果,而是丟擲asyncio.InvalidStateError錯誤

import asyncio

async def coroutine_example():
    await asyncio.sleep(1)
    return 'zhihu ID: Zarten'

coro = coroutine_example()

loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('執行情況:', task)
try:
    print('返回值:', task.result())
except asyncio.InvalidStateError:
    print('task狀態未完成,捕獲了 InvalidStateError 異常')

loop.run_until_complete(task)
print('再看下執行情況:', task)
print('返回值:', task.result())
loop.close()

執行結果可以看到:只有task狀態執行完成時才能捕獲返回值

  • 第2種方案:通過add_done_callback()回撥
import asyncio

def my_callback(future):
    print('返回值:', future.result())

async def coroutine_example():
    await asyncio.sleep(1)
    return 'zhihu ID: Zarten'

coro = coroutine_example()

loop = asyncio.get_event_loop()

task = loop.create_task(coro)
task.add_done_callback(my_callback)

loop.run_until_complete(task)
loop.close()

控制任務

通過asyncio.wait()可以控制多工

asyncio.wait()是一個協程,不會阻塞,立即返回,返回的是協程物件。傳入的引數是future或協程構成的可迭代物件。最後將返回值傳給run_until_complete()加入事件迴圈

  • 最簡單控制多工

下面程式碼asyncio.wait()中,引數傳入的是由協程構成的可迭代物件

import asyncio

async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)

loop = asyncio.get_event_loop()

tasks = [coroutine_example('Zarten_' + str(i)) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)
loop.close()

輸出結果:

  • 多工中獲取返回值

方案1:需要通過loop.create_task()建立task物件,以便後面來獲取返回值

下面程式碼asyncio.wait()中,引數傳入的是由future(task)物件構成的可迭代物件

import asyncio

async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = [loop.create_task(coroutine_example('Zarten_' + str(i))) for i in range(3)]
wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

for task in tasks:
    print(task.result())

loop.close()

方案2:通過回撥add_done_callback()來獲取返回值

import asyncio

def my_callback(future):
    print('返回值:', future.result())

async def coroutine_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    print('執行完畢name:', name)
    return '返回值:' + name

loop = asyncio.get_event_loop()

tasks = []
for i in range(3):
    task = loop.create_task(coroutine_example('Zarten_' + str(i)))
    task.add_done_callback(my_callback)
    tasks.append(task)

wait_coro = asyncio.wait(tasks)
loop.run_until_complete(wait_coro)

loop.close()

輸出結果:

動態新增協程

方案是建立一個執行緒,使事件迴圈線上程內永久執行

相關函式介紹:

loop.call_soon_threadsafe() :與 call_soon()類似,等待此函式返回後馬上呼叫回撥函式,返回值是一個 asyncio.Handle 物件,此物件內只有一個方法為 cancel()方法,用來取消回撥函式。

loop.call_soon() :與call_soon_threadsafe()類似,call_soon_threadsafe() 是執行緒安全的

loop.call_later():延遲多少秒後執行回撥函式

loop.call_at():在指定時間執行回撥函式,這裡的時間統一使用 loop.time() 來替代 time.sleep()

asyncio.run_coroutine_threadsafe():動態的加入協程,引數為一個回撥函式和一個loop物件,返回值為future物件,通過future.result()獲取回撥函式返回值

  • 動態新增協程同步方式

通過呼叫call_soon_threadsafe()函式,傳入一個回撥函式callback和一個位置引數

注意:同步方式,回撥函式 thread_example()為普通函式

import asyncio
from threading import Thread

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def thread_example(name):
    print('正在執行name:', name)
    return '返回結果:' + name


new_loop = asyncio.new_event_loop()
t = Thread(target= start_thread_loop, args=(new_loop,))
t.start()

handle = new_loop.call_soon_threadsafe(thread_example, 'Zarten1')
handle.cancel()

new_loop.call_soon_threadsafe(thread_example, 'Zarten2')

print('主執行緒不會阻塞')

new_loop.call_soon_threadsafe(thread_example, 'Zarten3')

print('繼續執行中...')

輸出結果:

  • 動態新增協程非同步方式

通過呼叫asyncio.run_coroutine_threadsafe()函式,傳入一個回撥函式callback和一個loop物件

注意:非同步方式,回撥函式 thread_example()為協程

import asyncio
from threading import Thread

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(1)
    return '返回結果:' + name



new_loop = asyncio.new_event_loop()
t = Thread(target= start_thread_loop, args=(new_loop,))
t.start()

future = asyncio.run_coroutine_threadsafe(thread_example('Zarten1'), new_loop)
print(future.result())

asyncio.run_coroutine_threadsafe(thread_example('Zarten2'), new_loop)

print('主執行緒不會阻塞')

asyncio.run_coroutine_threadsafe(thread_example('Zarten3'), new_loop)

print('繼續執行中...')

輸出結果:

從上面2個例子中,當主執行緒執行完成後,由於子執行緒還沒有退出,故主執行緒還沒退出,等待子執行緒退出中。若要主執行緒退出時子執行緒也退出,可以設定子執行緒為守護執行緒 t.setDaemon(True)

協程中生產-消費模型設計

通過上面的動態新增協程的思想,我們可以設計一個生產-消費的模型,至於中介軟體(管道)是什麼無所謂,下面以內建佇列和redis佇列來舉例說明。

提示:若想主執行緒退出時,子執行緒也隨之退出,需要將子執行緒設定為守護執行緒,函式 setDaemon(True)

內建雙向佇列模型

使用內建雙向佇列deque

import asyncio
from threading import Thread
from collections import deque
import random
import time

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def consumer():
    while True:
        if dq:
            msg = dq.pop()
            if msg:
                asyncio.run_coroutine_threadsafe(thread_example('Zarten'+ msg), new_loop)


async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(2)
    return '返回結果:' + name



dq = deque()

new_loop = asyncio.new_event_loop()
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()

consumer_thread = Thread(target= consumer)
consumer_thread.setDaemon(True)
consumer_thread.start()

while True:
    i = random.randint(1, 10)
    dq.appendleft(str(i))
    time.sleep(2)

輸出結果:

redis佇列模型

下面程式碼的主執行緒和雙向佇列的主執行緒有些不同,只是換了一種寫法而已,程式碼如下

生產者程式碼:

import redis

conn_pool = redis.ConnectionPool(host='127.0.0.1')
redis_conn = redis.Redis(connection_pool=conn_pool)

redis_conn.lpush('coro_test', '1')
redis_conn.lpush('coro_test', '2')
redis_conn.lpush('coro_test', '3')
redis_conn.lpush('coro_test', '4')

消費者程式碼:

import asyncio
from threading import Thread
import redis

def get_redis():
    conn_pool = redis.ConnectionPool(host= '127.0.0.1')
    return redis.Redis(connection_pool= conn_pool)

def start_thread_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def thread_example(name):
    print('正在執行name:', name)
    await asyncio.sleep(2)
    return '返回結果:' + name


redis_conn = get_redis()

new_loop = asyncio.new_event_loop()
loop_thread = Thread(target= start_thread_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()

#迴圈接收redis訊息並動態加入協程
while True:
    msg = redis_conn.rpop('coro_test')
    if msg:
        asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)

輸出結果:

asyncio在aiohttp中的應用

aiohttp是一個非同步庫,分為客戶端和服務端,下面只是簡單對客戶端做個介紹以及一個經常遇到的異常情況。aiohttp客戶端為非同步網路請求庫

aiohttp客戶端最簡單的例子

import asyncio
import aiohttp

count = 0

async def get_http(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            global count
            count += 1
            print(count, res.status)

def main():
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(10)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
if __name__ == '__main__':
    main()

aiohttp併發量太大的異常解決方案

在使用aiohttp客戶端進行大量併發請求時,程式會丟擲 ValueError: too many file descriptors in select() 的錯誤。

異常程式碼示例

說明:測試機器為windows系統

import asyncio
import aiohttp

count = 0

async def get_http(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            global count
            count += 1
            print(count, res.status)

def main():
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(600)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
if __name__ == '__main__':
    main()

原因分析:使用aiohttp時,python內部會使用select(),作業系統對檔案描述符最大數量有限制,linux為1024個,windows為509個。

解決方案:

最常見的解決方案是:限制併發數量(一般500),若併發的量不大可不作限制。其他方案這裡不做介紹,如windows下使用loop = asyncio.ProactorEventLoop() 以及使用回撥方式等

限制併發數量方法

提示:此方法也可用來作為非同步爬蟲的限速方法(反反爬)

使用semaphore = asyncio.Semaphore(500) 以及在協程中使用 async with semaphore: 操作

具體程式碼如下:

import asyncio
import aiohttp


async def get_http(url):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as res:
                global count
                count += 1
                print(count, res.status)

if __name__ == '__main__':
    count = 0

    semaphore = asyncio.Semaphore(500)
    loop = asyncio.get_event_loop()
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&ch=&tn=baiduerr&bar=&wd={0}'
    tasks = [get_http(url.format(i)) for i in range(600)]
    loop.run_until_complete(asyncio.wait(tasks))