我說帶你五分鐘學會Python協程就五分鐘!不會超過六分鐘!
協程由於由程式主動控制切換,沒有執行緒切換的開銷,所以執行效率極高。對於IO密集型任務非常適用,如果是cpu密集型,推薦多程序+協程的方式。
在Python3.4之前,官方沒有對協程的支援,存在一些三方庫的實現,比如gevent和Tornado。3.4之後就內建了asyncio標準庫,官方真正實現了協程這一特性。
而Python對協程的支援,是通過Generator實現的,協程是遵循某些規則的生成器。因此,我們在瞭解協程之前,我們先要學習生成器。
def test():
print("generator start")
n = 1
while True:
yield_expression_value = yield n
print("yield_expression_value = %d" % yield_expression_value)
n += 1
# ①建立generator物件
generator = test()
print(type(generator))
print("---------------")
# ②啟動generator
next_result = generator.__next__()
print("next_result = %d" % next_result)
print("---------------")
# ③傳送值給yield表示式
send_result = generator.send(666)
print("send_result = %d" % send_result)
執行結果:
---------------
generator start
next_result = 1
---------------
yield_expression_value = 666
send_result = 2
理解這個demo的關鍵是:生成器啟動或恢復執行一次,將會在yield處暫停。上面的第②步僅僅執行到了yield n,並沒有執行到賦值語句,到了第③步,生成器恢復執行才給yield_expression_value賦值。
生產者和消費者模型
進群:943752371 即可獲取數十套PDF!
上面的例子中,程式碼中斷-->切換執行,體現出了協程的部分特點。
我們再舉一個生產者、消費者的例子,
傳統的生產者-消費者模型是一個執行緒寫訊息,一個執行緒取訊息,通過鎖機制控制佇列和等待,但一不小心就可能死鎖。
現在改用協程,生產者生產訊息後,直接通過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。
def consumer():
print("[CONSUMER] start")
r = 'start'
while True:
n = yield r
if not n:
print("n is empty")
continue
print("[CONSUMER] Consumer is consuming %s" % n)
r = "200 ok"
def producer(c):
# 啟動generator
start_value = c.send(None)
print(start_value)
n = 0
while n < 3:
n += 1
print("[PRODUCER] Producer is producing %d" % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
# 關閉generator
c.close()
# 建立生成器
c = consumer()
# 傳入generator
producer(c)
執行結果:
[CONSUMER] start
start
[PRODUCER] producer is producing 1
[CONSUMER] consumer is consuming 1
[PRODUCER] Consumer return: 200 ok
[PRODUCER] producer is producing 2
[CONSUMER] consumer is consuming 2
[PRODUCER] Consumer return: 200 ok
[PRODUCER] producer is producing 3
[CONSUMER] consumer is consuming 3
[PRODUCER] Consumer return: 200 ok
注意到consumer函式是一個generator,把一個consumer傳入produce後:
1.首先呼叫c.send(None)啟動生成器;
2.然後,一旦生產了東西,通過c.send(n)切換到consumer執行;
3.consumer通過yield拿到訊息,處理,又通過yield把結果傳回;
4.produce拿到consumer處理的結果,繼續生產下一條訊息;
5.produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
整個流程無鎖,由一個執行緒執行,produce和consumer協作完成任務,所以稱為“協程”,而非執行緒的搶佔式多工。
yield from表示式
Python3.3版本新增yield
from語法,新語法用於將一個生成器部分操作委託給另一個生成器。此外,允許子生成器(即yield
from後的“引數”)返回一個值,該值可供委派生成器(即包含yield from的生成器)使用。並且在委派生成器中,可對子生成器進行優化。
我們先來看最簡單的應用,例如:
# 子生成器
def test(n):
i = 0
while i < n:
yield i
i += 1
# 委派生成器
def test_yield_from(n):
print("test_yield_from start")
yield from test(n)
print("test_yield_from end")
for i in test_yield_from(3):
print(i)
輸出:
test_yield_from start
0
1
2
test_yield_from end
這裡我們僅僅給這個生成器添加了一些列印,如果是正式的程式碼中,你可以新增正常的執行邏輯。
如果上面的test_yield_from函式中有兩個yield from語句,將序列執行。比如將上面的test_yield_from函式改寫成這樣:
def test_yield_from(n):
print("test_yield_from start")
yield from test(n)
print("test_yield_from doing")
yield from test(n)
print("test_yield_from end")
將輸出:
test_yield_from start
0
1
2
test_yield_from doing
0
1
2
test_yield_from end
在這裡,yield from起到的作用相當於下面寫法的簡寫形式
for item in test(n):
yield item
看起來這個yield
from也沒做什麼大不了的事,其實它還幫我們處理了異常之類的。具體可以看stackoverflow上的這個問題:In practice,
what are the main uses for the new “yield from” syntax in Python 3.3?
協程(Coroutine)
Python3.4開始,新增了asyncio相關的API,語法使用@asyncio.coroutine和yield from實現協程
Python3.5中引入async/await語法,參見PEP492
我們先來看Python3.4的實現。
@asyncio.coroutine
Python3.4中,使用@asyncio.coroutine裝飾的函式稱為協程。不過沒有從語法層面進行嚴格約束。
對於Python原生支援的協程來說,Python對協程和生成器做了一些區分,便於消除這兩個不同但相關的概念的歧義:
標記了@asyncio.coroutine裝飾器的函式稱為協程函式,iscoroutinefunction()方法返回True
呼叫協程函式返回的物件稱為協程物件,iscoroutine()函式返回True
舉個栗子,我們給上面yield from的demo中新增@asyncio.coroutine:
import asyncio
...
@asyncio.coroutine
def test_yield_from(n):
...
# 是否是協程函式
print(asyncio.iscoroutinefunction(test_yield_from))
# 是否是協程物件
print(asyncio.iscoroutine(test_yield_from(3)))
毫無疑問輸出結果是True。
可以看下@asyncio.coroutine的原始碼中檢視其做了什麼,我將其原始碼簡化下,大致如下:
import functools
import types
import inspect
def coroutine(func):
# 判斷是否是生成器
if inspect.isgeneratorfunction(func):
coro = func
else:
# 將普通函式變成generator
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
res = yield from res
return res
# 將generator轉換成coroutine
wrapper = types.coroutine(coro)
# For iscoroutinefunction().
wrapper._is_coroutine = True
return wrapper
將這個裝飾器標記在一個生成器上,就會將其轉換成coroutine。
然後,我們來實際使用下@asyncio.coroutine和yield from:
import asyncio
@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
print("start")
# 中斷呼叫,直到協程執行結束
loop.run_until_complete(print_sum(1, 2))
print("end")
loop.close()
執行結果:
start
Compute 1 + 2 ...
1 + 2 = 3
end
print_sum這個協程中呼叫了子協程compute,它將等待compute執行結束才返回結果。
這個demo點呼叫流程如下圖:
EventLoop將會把print_sum封裝成Task物件
流程圖展示了這個demo的控制流程,不過沒有展示其全部細節。比如其中“暫停”的1s,實際上建立了一個future物件, 然後通過BaseEventLoop.call_later()在1s後喚醒這個任務。
值得注意的是,@asyncio.coroutine將在Python3.10版本中移除。
async/await
Python3.5開始引入async/await語法(PEP 492),用來簡化協程的使用並且便於理解。
async/await實際上只是@asyncio.coroutine和yield from的語法糖:
把@asyncio.coroutine替換為async
把yield from替換為await
即可。
比如上面的例子:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
print("start")
loop.run_until_complete(print_sum(1, 2))
print("end")
loop.close()
我們再來看一個asyncio中Future的例子:
import asyncio
future = asyncio.Future()
async def coro1():
print("wait 1 second")
await asyncio.sleep(1)
print("set_result")
future.set_result('data')
async def coro2():
result = await future
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1()
coro2()
]))
loop.close()
輸出結果:
wait 1 second
(大約等待1秒)
set_result
data
這裡await後面跟隨的future物件,協程中yield from或者await後面可以呼叫future物件,其作用是:暫停協程,直到future執行結束或者返回result或丟擲異常。
而在我們的例子中,await future必須要等待future.set_result('data')後才能夠結束。將coro2()作為第二個協程可能體現得不夠明顯,可以將協程的呼叫改成這樣:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
# coro1(),
coro2(),
coro1()
]))
loop.close()
輸出的結果仍舊與上面相同。
其實,async這個關鍵字的用法不止能用在函式上,還有async with非同步上下文管理器,async for非同步迭代器. 對這些感興趣且覺得有用的可以網上找找資料,這裡限於篇幅就不過多展開了。