1. 程式人生 > 實用技巧 >Python Async/Await入門指南

Python Async/Await入門指南

https://zhuanlan.zhihu.com/p/27258289

本文將會講述Python 3.5之後出現的async/await的使用方法,以及它們的一些使用目的,如果錯誤,歡迎指正。

昨天看到David Beazley在16年的一個演講:Fear and Awaiting in Async,給了我不少的感悟和啟發,於是想梳理下自己的思路,所以有了以下這篇文章。

Python在3.5版本中引入了關於協程的語法糖async和await,關於協程的概念可以先看我在上一篇文章提到的內容。

看下Python中常見的幾種函式形式:

1. 普通函式

def function():
    return
1
2. 生成器函式
def generator():
    yield 1

在3.5過後,我們可以使用async修飾將普通函式和生成器函式包裝成非同步函式和非同步生成器。

3. 非同步函式(協程)


async def async_function():
    return 1

4. 非同步生成器


async def async_generator():
    yield 1

通過型別判斷可以驗證函式的型別

import types
print(type(function) is types.FunctionType)
print(type(generator())
is types.GeneratorType) print(type(async_function()) is types.CoroutineType) print(type(async_generator()) is types.AsyncGeneratorType)

直接呼叫非同步函式不會返回結果,而是返回一個coroutine物件:

print(async_function())
# <coroutine object async_function at 0x102ff67d8>

協程需要通過其他方式來驅動,因此可以使用這個協程物件的send方法給協程傳送一個值:

print
(async_function().send(None))

不幸的是,如果通過上面的呼叫會丟擲一個異常:

StopIteration: 1

因為生成器/協程在正常返回退出時會丟擲一個StopIteration異常,而原來的返回值會存放在StopIteration物件的value屬性中,通過以下捕獲可以獲取協程真正的返回值:

try:
    async_function().send(None)
except StopIteration as e:
    print(e.value)
# 1

通過上面的方式來新建一個run函式來驅動協程函式:

def run(coroutine):
    try:
        coroutine.send(None)
    except StopIteration as e:
        return e.value

在協程函式中,可以通過await語法來掛起自身的協程,並等待另一個協程完成直到返回結果:

async def async_function():
    return 1

async def await_coroutine():
    result = await async_function()
    print(result)
    
run(await_coroutine())
# 1

要注意的是,await語法只能出現在通過async修飾的函式中,否則會報SyntaxError錯誤。

而且await後面的物件需要是一個Awaitable,或者實現了相關的協議。

檢視Awaitable抽象類的程式碼,表明了只要一個類實現了__await__方法,那麼通過它構造出來的例項就是一個Awaitable:

class Awaitable(metaclass=ABCMeta):
    __slots__ = ()

    @abstractmethod
    def __await__(self):
        yield

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Awaitable:
            return _check_methods(C, "__await__")
        return NotImplemented

而且可以看到,Coroutine類也繼承了Awaitable,而且實現了send,throw和close方法。所以await一個呼叫非同步函式返回的協程物件是合法的。

class Coroutine(Awaitable):
    __slots__ = ()

    @abstractmethod
    def send(self, value):
        ...

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        ...

    def close(self):
        ...
        
    @classmethod
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented

接下來是非同步生成器,來看一個例子:

假如我要到一家超市去購買土豆,而超市貨架上的土豆數量是有限的:

class Potato:
    @classmethod
    def make(cls, num, *args, **kws):
        potatos = []
        for i in range(num):
            potatos.append(cls.__new__(cls, *args, **kws))
        return potatos

all_potatos = Potato.make(5)

現在我想要買50個土豆,每次從貨架上拿走一個土豆放到籃子:

def take_potatos(num):
    count = 0
    while True:
        if len(all_potatos) == 0:
            sleep(.1)
        else:
            potato = all_potatos.pop()
            yield potato
            count += 1
            if count == num:
                break

def buy_potatos():
    bucket = []
    for p in take_potatos(50):
        bucket.append(p)

對應到程式碼中,就是迭代一個生成器的模型,顯然,當貨架上的土豆不夠的時候,這時只能夠死等,而且在上面例子中等多長時間都不會有結果(因為一切都是同步的),也許可以用多程序和多執行緒解決,而在現實生活中,更應該像是這樣的:

async def take_potatos(num):
    count = 0
    while True:
        if len(all_potatos) == 0:
            await ask_for_potato()
        potato = all_potatos.pop()
        yield potato
        count += 1
        if count == num:
            break

當貨架上的土豆沒有了之後,我可以詢問超市請求需要更多的土豆,這時候需要等待一段時間直到生產者完成生產的過程:

async def ask_for_potato():
    await asyncio.sleep(random.random())
    all_potatos.extend(Potato.make(random.randint(1, 10)))

當生產者完成和返回之後,這是便能從await掛起的地方繼續往下跑,完成消費的過程。而這整一個過程,就是一個非同步生成器迭代的流程:

async def buy_potatos():
    bucket = []
    async for p in take_potatos(50):
        bucket.append(p)
        print(f'Got potato {id(p)}...')

async for語法表示我們要後面迭代的是一個非同步生成器。

def main():
    import asyncio
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(buy_potatos())
    loop.close()

用asyncio執行這段程式碼,結果是這樣的:

Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...
Got potato 4344843456...
Got potato 4344843400...
Got potato 4338641384...
Got potato 4338641160...
...

既然是非同步的,在請求之後不一定要死等,而是可以做其他事情。比如除了土豆,我還想買番茄,這時只需要在事件迴圈中再新增一個過程:

def main():
    import asyncio
    loop = asyncio.get_event_loop()
    res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
    loop.close()

再來執行這段程式碼:

Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...
Got potato 4429331760...
Got potato 4429331704...
Got potato 4429346688...
Got potato 4429346072...
Got tomato 4429347360...
...

看下AsyncGenerator的定義,它需要實現__aiter__和__anext__兩個核心方法,以及asend,athrow,aclose方法。

class AsyncGenerator(AsyncIterator):
    __slots__ = ()

    async def __anext__(self):
        ...

    @abstractmethod
    async def asend(self, value):
        ...

    @abstractmethod
    async def athrow(self, typ, val=None, tb=None):
        ...

    async def aclose(self):
        ...

    @classmethod
    def __subclasshook__(cls, C):
        if cls is AsyncGenerator:
            return _check_methods(C, '__aiter__', '__anext__',
                                  'asend', 'athrow', 'aclose')
        return NotImplemented

非同步生成器是在3.6之後才有的特性,同樣的還有非同步推導表示式,因此在上面的例子中,也可以寫成這樣:

bucket = [p async for p in take_potatos(50)]

類似的,還有await表示式:

result = [await fun() for fun in funcs if await condition()]

除了函式之外,類例項的普通方法也能用async語法修飾:

class ThreeTwoOne:
    async def begin(self):
        print(3)
        await asyncio.sleep(1)
        print(2)
        await asyncio.sleep(1)
        print(1)        
        await asyncio.sleep(1)
        return

async def game():
    t = ThreeTwoOne()
    await t.begin()
    print('start')

例項方法的呼叫同樣是返回一個coroutine:

function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())

同理還有類方法:

class ThreeTwoOne:
    @classmethod
    async def begin(cls):
        print(3)
        await asyncio.sleep(1)
        print(2)
        await asyncio.sleep(1)
        print(1)        
        await asyncio.sleep(1)
        return

async def game():
    await ThreeTwoOne.begin()
    print('start')

根據PEP 492中,async也可以應用到上下文管理器中,__aenter__和__aexit__需要返回一個Awaitable:

class GameContext:
    async def __aenter__(self):
        print('game loading...')
        await asyncio.sleep(1)

    async def __aexit__(self, exc_type, exc, tb):
        print('game exit...')
        await asyncio.sleep(1)

async def game():
    async with GameContext():
        print('game start...')
        await asyncio.sleep(2)

在3.7版本,contextlib中會新增一個asynccontextmanager裝飾器來包裝一個實現非同步協議的上下文管理器:

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_connection():
    conn = await acquire_db_connection()
    try:
        yield
    finally:
        await release_db_connection(conn)

async修飾符也能用在__call__方法上:

class GameContext:
    async def __aenter__(self):
        self._started = time()
        print('game loading...')
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print('game exit...')
        await asyncio.sleep(1)

    async def __call__(self, *args, **kws):
        if args[0] == 'time':
            return time() - self._started

async def game():
    async with GameContext() as ctx:
        print('game start...')
        await asyncio.sleep(2)
        print('game time: ', await ctx('time'))

await和yield from

Python3.3的yield from語法可以把生成器的操作委託給另一個生成器,生成器的呼叫方可以直接與子生成器進行通訊:

def sub_gen():
    yield 1
    yield 2
    yield 3

def gen():
    return (yield from sub_gen())

def main():
    for val in gen():
        print(val)
# 1
# 2
# 3

利用這一特性,使用yield from能夠編寫出類似協程效果的函式呼叫,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from語法來建立協程:

# https://docs.python.org/3.4/library/asyncio-task.html
import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

然而,用yield from容易在表示協程和生成器中混淆,沒有良好的語義性,所以在Python 3.5推出了更新的async/await表示式來作為協程的語法。

因此類似以下的呼叫是等價的:

async with lock:
    ...
    
with (yield from lock):
    ...
######################
def main():
    return (yield from coro())

def main():
    return (await coro())

那麼,怎麼把生成器包裝為一個協程物件呢?這時候可以用到types包中的coroutine裝飾器(如果使用asyncio做驅動的話,那麼也可以使用asyncio的coroutine裝飾器),@types.coroutine裝飾器會將一個生成器函式包裝為協程物件:

import asyncio
import types

@types.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

儘管兩個函式分別使用了新舊語法,但他們都是協程物件,也分別稱作native coroutine以及generator-based coroutine,因此不用擔心語法問題。

下面觀察一個asyncio中Future的例子:

import asyncio

future = asyncio.Future()

async def coro1():
    await asyncio.sleep(1)
    future.set_result('data')

async def coro2():
    print(await future)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    coro1(), 
    coro2()
]))
loop.close()

兩個協程在在事件迴圈中,協程coro1在執行第一句後掛起自身切到asyncio.sleep,而協程coro2一直等待future的結果,讓出事件迴圈,計時器結束後coro1執行了第二句設定了future的值,被掛起的coro2恢復執行,打印出future的結果'data'。

future可以被await證明了future物件是一個Awaitable,進入Future類的原始碼可以看到有一段程式碼顯示了future實現了__await__協議:

class Future:
    ...
    def __iter__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

    if compat.PY35:
        __await__ = __iter__ # make compatible with 'await' expression

當執行await future這行程式碼時,future中的這段程式碼就會被執行,首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。

當future執行set_result方法時,會觸發以下的程式碼,設定結果,標記future已經完成:

def set_result(self, result):
    ...
    if self._state != _PENDING:
        raise InvalidStateError('{}: {!r}'.format(self._state, self))
    self._result = result
    self._state = _FINISHED
    self._schedule_callbacks()

最後future會排程自身的回撥函式,觸發Task._step()告知Task驅動future從之前掛起的點恢復執行,不難看出,future會執行下面的程式碼:

class Future:
    ...
    def __iter__(self):
        ...
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

最終返回結果給呼叫方。

前面講了那麼多關於asyncio的例子,那麼除了asyncio,就沒有其他協程庫了嗎?asyncio作為python的標準庫,自然受到很多青睞,但它有時候還是顯得太重量了,尤其是提供了許多複雜的輪子和協議,不便於使用。

你可以理解為,asyncio是使用async/await語法開發的協程庫,而不是有asyncio才能用async/await,除了asyncio之外,curio和trio是更加輕量級的替代物,而且也更容易使用。

curio的作者是David Beazley,下面是使用curio建立tcp server的例子,據說這是dabeaz理想中的一個非同步伺服器的樣子:

from curio import run, spawn
from curio.socket import *

async def echo_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    print('Server listening at', address)
    async with sock:
        while True:
            client, addr = await sock.accept()
            await spawn(echo_client, client, addr)

async def echo_client(client, addr):
    print('Connection from', addr)
    async with client:
         while True:
             data = await client.recv(100000)
             if not data:
                 break
             await client.sendall(data)
    print('Connection closed')

if __name__ == '__main__':
    run(echo_server, ('',25000))

無論是asyncio還是curio,或者是其他非同步協程庫,在背後往往都會藉助於IO的事件迴圈來實現非同步,下面用幾十行程式碼來展示一個簡陋的基於事件驅動的echo伺服器:

from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from selectors import DefaultSelector, EVENT_READ

selector = DefaultSelector()
pool = {}

def request(client_socket, addr):
    client_socket, addr = client_socket, addr
    def handle_request(key, mask):
        data = client_socket.recv(100000)
        if not data:
            client_socket.close()
            selector.unregister(client_socket)
            del pool[addr]
        else:
            client_socket.sendall(data)
    return handle_request

def recv_client(key, mask):
    sock = key.fileobj
    client_socket, addr = sock.accept()
    req = request(client_socket, addr)
    pool[addr] = req
    selector.register(client_socket, EVENT_READ, req)

def echo_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    selector.register(sock, EVENT_READ, recv_client)
    try:
        while True:
            events = selector.select()
            for key, mask in events:
                callback = key.data
                callback(key, mask)
    except KeyboardInterrupt:
        sock.close()

if __name__ == '__main__':
    echo_server(('',25000))

驗證一下:

# terminal 1
$ nc localhost 25000
hello world
hello world

# terminal 2
$ nc localhost 25000
hello world
hello world

現在知道,完成非同步的程式碼不一定要用async/await,使用了async/await的程式碼也不一定能做到非同步,async/await是協程的語法糖,使協程之間的呼叫變得更加清晰,使用async修飾的函式呼叫時會返回一個協程物件,await只能放在async修飾的函式裡面使用,await後面必須要跟著一個協程物件或Awaitable,await的目的是等待協程控制流的返回,而實現暫停並掛起函式的操作是yield。

個人認為,async/await以及協程是Python未來實現非同步程式設計的趨勢,我們將會在更多的地方看到他們的身影,例如協程庫的curio和trio,web框架的sanic,資料庫驅動的asyncpg等等...在Python 3主導的今天,作為開發者,應該及時擁抱和適應新的變化,而基於async/await的協程憑藉良好的可讀性和易用性日漸登上舞臺,看到這裡,你還不趕緊上車?


參考:

PEP 492PEP 525