攜程的那點事
攜程的那點事
當前python使用的攜程模組
- greenlet和基於greenlet開發的gevent模組(手動切換阻塞)
- yield關鍵字,只是能模擬出攜程的執行過程
- asyncio (自動切換阻塞) python3.4版本後加入
- async & await關鍵字 python3.5版本後加入
3和4的區別在於asyncio是用語法糖模式,而async是直接在函式前加async,可以看下他們的語法上的差別並不大
asyncio模組的方法解釋
- asyncio的原理就是通過把N個任務放到一個死迴圈中,那麼放入前我們需要先獲得一個迴圈器的物件。
然後在迴圈器中,N個任務組成的任務列表,任務列表返回可執行任務和已經完成任務,
可執行任務丟到執行列表,準備執行,已完成任務從已完成任務列表中刪除。
最後任務列表為空的時候,那麼迴圈結束。
# 先獲取一個事件迴圈器的物件
loop=asyncio.get_event_loop()
# 將任務放到任務列表中
loop.run_until_complete(asyncio.wait(task))
-
async def 函式名 叫協程函式,而協程函式的返回值叫協程物件.
執行協程物件並不會執行協程函式內部程式碼,必須要交給事件迴圈器來執行
async def func(): #協程函式 print("start") ret = func() #到這一步並不會立即執行協程物件 # 必須交給迴圈器來執行 loop=asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(ret)) # python3.7對迴圈器又進行了封裝,只需要呼叫run方法即可 asyncio.run(ret)
-
await + 可等待物件(協程物件,Future,Task物件) <io等待>, 一個協程函式裡可以有多個await
協程函式體中,遇到await,函式是等待的,不會切換執行內部的其他函式.這種執行方式和普通的函式從上而下執行順序沒有區別
import asyncio async def func(aa): print("%s>>start"%aa) await asyncio.sleep(2) print("%s>>end"%aa) return "func結束了,返回值是%s"%aa async def main(): print("main執行") ret1 = await func("ret1") print("ret1返回值是%s"%ret1) ret2 = await func("ret2") print("ret2返回值是%s" % ret2) obj=asyncio.get_event_loop() obj.run_until_complete(main()) """ main執行 ret1>>start ret1>>end ret1返回值是func結束了,返回值是ret1 ret2>>start ret2>>end ret2返回值是func結束了,返回值是ret2 """
-
Task物件 在事件迴圈器中新增多個任務的,因為是協程模式排程執行,和併發感覺比較像
Tasks用於併發排程協程,通過asyncio.create_task(協程物件)的方式建立Task物件,這樣可以讓協程加入事件迴圈器中被排程執行,除了通過asyncio.create_task(協程物件)的方式建立Task物件,還可以用低層級的loop.create_task()或者ensure_future()函式
asyncio.create_task(協程物件) 在Python 3.7才有,之前版本推薦使用ensure_future()函式
import asyncio async def func(aa): print("%s>>start"%aa) await asyncio.sleep(2) print("%s>>end"%aa) return "func結束了,返回值是%s"%aa async def main(): print("main執行") task1 = obj.create_task(func("func1")) print("1") task2 = obj.create_task(func("func2")) print("2") task3 = asyncio.ensure_future(func("func3")) print("3") # python 3.7以上版本適用 # task4 = asyncio.create_task(func("fun4")) ret1 = await task1 ret2 = await task2 ret3 = await task3 print(ret1) print(ret2) print(ret3) obj=asyncio.get_event_loop() obj.run_until_complete(main()) # python 3.7以上版本適用 # asyncio.run(main()) """ main執行 func1>>start func2>>start func3>>start func1>>end func2>>end func3>>end func結束了,返回值是func1 func結束了,返回值是func2 func結束了,返回值是func3 """ 從輸出列印的內容看task會把函式新增任務後執行,新增後會繼續往下執行,await是阻塞等待程序返回值
上述例子只是瞭解使用語法.一般我們會這麼去遍歷使用
下面看下紅色框框就是優化後的程式碼.done和pending是await asyncio.wait(takslist)的返回值,執行結束的會放到done變數,而未結束的會放到pending 中去,done和pending都是一個集合物件.
await asyncio.wait(takslist,timeout =2),他們還有個timeout引數,可以設定等待時間,如果等待時間到強行結束,預設設定為None
下面例三,看下去掉上面的main協程函式怎麼執行,asyncio.wait里加的
import asyncio
async def func(aa):
print("%s>>start"%aa)
await asyncio.sleep(2)
print("%s>>end"%aa)
return "func結束了,返回值是%s"%aa
takslist = [
func("func1"),
func("func2"),
func("func3"),
]
# 用下面這種就不行,因為這麼寫會把task立即加到迴圈器中,而此時obj還未產生迴圈器的例項物件
#tasklist=[
# obj.create_task(func("func1")),
# obj.create_task(func("func2")),
# ]
obj=asyncio.get_event_loop()
done,pending = obj.run_until_complete(asyncio.wait(takslist))
print(done)
但是把tasklist放到obj下面就可以運行了,但是這也破壞了程式碼的結構和呼叫方式
#obj=asyncio.get_event_loop()
#takslist=[
# obj.create_task(func("func1")),
# obj.create_task(func("func2")),
#]
#done,pending = obj.run_until_complete(asyncio.wait(takslist))
-
Future物件 功能也是用來等待非同步處理的結果的
task繼承了Future,Task物件內部await的結果的處理是基於Future物件來的.
但是Future偏向底層,而且我們一般也不會手動去建立,都是通過呼叫task物件進行處理的
這裡主要提下另一個多(進)執行緒模組下也有個future物件
asyncio.Future和concurrent.futures.Future
concurrent.futures.Future 是使用程序池和執行緒池使用到的模組,作用是程序池和執行緒池非同步操作時使用的物件
一般這2個模組不會有交集,但是如果呼叫的第三方產品 假設Mysql不支援非同步訪問呼叫,那麼用這個Future物件把他們的處理方式模擬成非同步形態進行處理
下面是如何用協程函式呼叫普通函式,2個要點,
- 普通函式外面要包一層協程函式,
- 用迴圈器.run_in_executor()來跳開阻塞,用多執行緒執行,第一個引數是用來放程序池(執行緒池的)
import asyncio def faa(idx): print("第%s個foo開始執行" % idx) time.sleep(2) return idx async def faa_faster(idx): obj = asyncio.get_event_loop() #在建立迭代器時候就用run_in_executor呼叫多(進)執行緒 ret = obj.run_in_executor(None,faa,idx) # print("第%s個foo開始執行" % idx) a = await ret print(a) task = [faa_faster(i) for i in range(1000)] obj = asyncio.get_event_loop() obj.run_until_complete(asyncio.wait(task))
示例2.多執行緒執行普通函式,並以協程模式執行
import asyncio from concurrent.futures.thread import ThreadPoolExecutor def faa(idx): print("第%s個foo開始執行" % idx) time.sleep(2) return idx async def faa_faster(pool,idx): obj = asyncio.get_event_loop() #第一個引數預設是None,如果傳(進)執行緒池進去,就以多(進)執行緒形式執行普通函式 ret = obj.run_in_executor(pool,faa,idx) # print("第%s個foo開始執行" % idx) a = await ret print(a) pool = ThreadPoolExecutor(max_workers=5) task = [faa_faster(pool,i) for i in range(1000)] obj = asyncio.get_event_loop() obj.run_until_complete(asyncio.wait(task))