Python協程與asyncio
阿新 • • 發佈:2018-11-07
asyncio(解決非同步io程式設計的一整套解決方案,它主要用於非同步網路操作、併發和協程)
協程(Coroutine一種使用者態的輕量級微執行緒,它是程式級別的,在執行過程中可以中斷去執行其它的子程式,別的子程式也可以中斷回來繼續執行之前的子程式,無需執行緒上下文切換的開銷)
get_event_loop:建立事件迴圈
run_until_complete(future):把協程註冊到事件迴圈上,直到它執行完
# coding:utf-8
import asyncio
import time
# 使用async定義一個協程
async def get_corouting():
print("start get a ...")
await asyncio.sleep(2)
print("end get a ...")
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(get_corouting())
# end_time會在run_until_complete裡面的協程(get_corouting)執行完後再執行
end_time = time.time()
print(end_time - start_time)
執行結果:
start get a ...
end get a ...
2.004028797149658
create_task(coro):建立一個task,將協程註冊到時間迴圈中
add_done_callback(callback):task在返回結果前執行回撥函式,它的引數是接受一個方法callback,如果這個方法需要傳引數可使用partial
# coding:utf-8
import asyncio
from functools import partial
# 使用async定義一個協程
async def get_corouting():
print("start get a ...")
await asyncio.sleep( 2)
return "cor result"
def callback(file, future):
"""
回撥執行解析一個檔案
:param file:
:param future:
:return:
"""
print("resolve a file {}".format(file))
loop = asyncio.get_event_loop()
task = loop.create_task(get_corouting())
task.add_done_callback(partial(callback, "a.txt"))
loop.run_until_complete(task)
print(task.result())
result:
start get a ...
resolve a file a.txt
cor result
在task返回結果前先執行一個回撥函式執行解析一個檔案,使用partial返回函式
asyncio.wait:與執行緒池的wait方法一樣,可接受FIRST_COMPLETED,ALL_COMPLETED等引數,用於等待協程的完成
asyncio.gather:比wait更加高階,可以對task進行分組,並且可以批量取消task
# coding:utf-8
import time
import asyncio
# 使用async定義一個協程
async def get_corouting(i):
print("start get a{} ...".format(i))
await asyncio.sleep(2)
return "cor result"
start = time.time()
loop = asyncio.get_event_loop()
tasks1 = [get_corouting(i) for i in range(5) if i % 2 == 0]
tasks2 = [get_corouting(i) for i in range(5) if i % 2 == 1]
group_tasks1 = asyncio.gather(*tasks1)
group_tasks2 = asyncio.gather(*tasks2)
loop.run_until_complete(asyncio.gather(group_tasks1, group_tasks2))
end = time.time()
print(end - start)
result:
start get a4 ...
start get a0 ...
start get a2 ...
start get a3 ...
start get a1 ...
2.006136178970337
在協程裡面巢狀協程:
# coding:utf-8
import asyncio
async def compute(x, y):
print("compute {0} + {1}".format(x, y))
await asyncio.sleep(1)
return x + y
async def print_sum(x, y):
"""
await時呼叫compute協程
:param x:
:param y:
:return:
"""
result = await compute(x, y)
print("{0} + {1} = {2}".format(x, y, result))
# 建立task
loop = asyncio.get_event_loop()
# 將協程print_sum註冊到loop中
loop.run_until_complete(print_sum(1, 2))
loop.close()
result:
compute 1 + 2
1 + 2 = 3
在asyncio事件迴圈中呼叫非協程回撥函式:
call_soon:佇列中等待到下一個事件迴圈時會立即執行
call_later:根據延時呼叫的時間確定執行的順序
call_at:在指定的時間執行回撥函式 這個時間是loop裡面的單調時間(loop.time())
1 # coding:utf-8 2 3 import asyncio 4 5 6 def callback(sleep_times, func_name, loop): 7 print( 8 "{0} time {1} loop_time {2}".format( 9 func_name, sleep_times, loop.time() 10 ) 11 ) 12 13 14 loop = asyncio.get_event_loop() 15 loop.call_later(3, callback, 3, "call_later", loop) 16 loop.call_later(2, callback, 2, "call_later", loop) 17 loop.call_at(loop.time(), callback, 4, "call_at", loop) 18 loop.call_soon(callback, 5, "call_soon", loop) 19 loop.run_forever() 20 21 22 result: 23 call_soon time 5 loop_time 7580.552303919 24 call_at time 4 loop_time 7580.552377718 25 call_later time 2 loop_time 7582.554425915 26 call_later time 3 loop_time 7583.555097398
在這個事件迴圈中,call_soon最先執行,接著call_at指定的時間是loop當前時間,call_at執行,隨後是call_later根據延時的時間大小執行。
使用多執行緒在協程中整合阻塞IO:
# coding:utf-8
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def get_something(i):
"""
用sleep模擬阻塞
:param i:
:return:
"""
time.sleep(i)
print("get {} success".format(i))
start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(10)
# run_in_executor:將阻塞函式放到executor(執行緒池)中執行
tasks = [loop.run_in_executor(executor, get_something, i) for i in range(1, 6)]
# 等待task執行完成
loop.run_until_complete(asyncio.wait(tasks))
print("run time:{}".format(time.time() - start_time))
result:
get 1 success
get 2 success
get 3 success
get 4 success
get 5 success
run time:5.009312391281128