1. 程式人生 > 其它 >攜程的那點事

攜程的那點事

攜程的那點事

當前python使用的攜程模組

  1. greenlet和基於greenlet開發的gevent模組(手動切換阻塞)
  2. yield關鍵字,只是能模擬出攜程的執行過程
  3. asyncio (自動切換阻塞) python3.4版本後加入
  4. async & await關鍵字 python3.5版本後加入

3和4的區別在於asyncio是用語法糖模式,而async是直接在函式前加async,可以看下他們的語法上的差別並不大

asyncio模組的方法解釋

  • asyncio的原理就是通過把N個任務放到一個死迴圈中,那麼放入前我們需要先獲得一個迴圈器的物件。

然後在迴圈器中,N個任務組成的任務列表,任務列表返回可執行任務和已經完成任務,

可執行任務丟到執行列表,準備執行,已完成任務從已完成任務列表中刪除。

最後任務列表為空的時候,那麼迴圈結束。

# 先獲取一個事件迴圈器的物件
loop=asyncio.get_event_loop()
# 將任務放到任務列表中
loop.run_until_complete(asyncio.wait(task))
  • async def 函式名 叫協程函式,而協程函式的返回值叫協程物件.

    執行協程物件並不會執行協程函式內部程式碼,必須要交給事件迴圈器來執行

async def func():  #協程函式
	print("start")
ret = func() #到這一步並不會立即執行協程物件

# 必須交給迴圈器來執行
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(ret))

# python3.7對迴圈器又進行了封裝,只需要呼叫run方法即可
asyncio.run(ret)
  • await + 可等待物件(協程物件,Future,Task物件) <io等待>, 一個協程函式裡可以有多個await

    協程函式體中,遇到await,函式是等待的,不會切換執行內部的其他函式.這種執行方式和普通的函式從上而下執行順序沒有區別

import asyncio

async def func(aa):
    print("%s>>start"%aa)
    await asyncio.sleep(2)
    print("%s>>end"%aa)
    return "func結束了,返回值是%s"%aa


async def main():
    print("main執行")
    ret1 = await func("ret1")
    print("ret1返回值是%s"%ret1)

    ret2 = await func("ret2")
    print("ret2返回值是%s" % ret2)

obj=asyncio.get_event_loop()
obj.run_until_complete(main())

"""
main執行
ret1>>start
ret1>>end
ret1返回值是func結束了,返回值是ret1
ret2>>start
ret2>>end
ret2返回值是func結束了,返回值是ret2
"""
  • Task物件 在事件迴圈器中新增多個任務的,因為是協程模式排程執行,和併發感覺比較像

    Tasks用於併發排程協程,通過asyncio.create_task(協程物件)的方式建立Task物件,這樣可以讓協程加入事件迴圈器中被排程執行,除了通過asyncio.create_task(協程物件)的方式建立Task物件,還可以用低層級的loop.create_task()或者ensure_future()函式

    asyncio.create_task(協程物件) 在Python 3.7才有,之前版本推薦使用ensure_future()函式

    import asyncio
    
    async def func(aa):
        print("%s>>start"%aa)
        await asyncio.sleep(2)
        print("%s>>end"%aa)
        return "func結束了,返回值是%s"%aa
    
    async def main():
        print("main執行")
        task1 = obj.create_task(func("func1"))
        print("1")
        task2 = obj.create_task(func("func2"))
        print("2")
        task3 = asyncio.ensure_future(func("func3"))
        print("3")
        # python 3.7以上版本適用
        # task4 = asyncio.create_task(func("fun4"))
        ret1 = await task1
        ret2 = await task2
        ret3 = await task3
        print(ret1)
        print(ret2)
        print(ret3)
    obj=asyncio.get_event_loop()
    obj.run_until_complete(main())
    # python 3.7以上版本適用
    # asyncio.run(main())
    
    """
    main執行
    func1>>start
    func2>>start
    func3>>start
    func1>>end
    func2>>end
    func3>>end
    func結束了,返回值是func1
    func結束了,返回值是func2
    func結束了,返回值是func3
    """
    從輸出列印的內容看task會把函式新增任務後執行,新增後會繼續往下執行,await是阻塞等待程序返回值
    
    

    上述例子只是瞭解使用語法.一般我們會這麼去遍歷使用

    下面看下紅色框框就是優化後的程式碼.done和pending是await asyncio.wait(takslist)的返回值,執行結束的會放到done變數,而未結束的會放到pending 中去,done和pending都是一個集合物件.

    await asyncio.wait(takslist,timeout =2),他們還有個timeout引數,可以設定等待時間,如果等待時間到強行結束,預設設定為None

下面例三,看下去掉上面的main協程函式怎麼執行,asyncio.wait里加的

import asyncio

async def func(aa):
    print("%s>>start"%aa)
    await asyncio.sleep(2)
    print("%s>>end"%aa)
    return "func結束了,返回值是%s"%aa


takslist = [
        func("func1"),
        func("func2"),
        func("func3"),
    ]

# 用下面這種就不行,因為這麼寫會把task立即加到迴圈器中,而此時obj還未產生迴圈器的例項物件
#tasklist=[
#     obj.create_task(func("func1")),
#     obj.create_task(func("func2")),
# ]

obj=asyncio.get_event_loop()
done,pending = obj.run_until_complete(asyncio.wait(takslist))
print(done)


但是把tasklist放到obj下面就可以運行了,但是這也破壞了程式碼的結構和呼叫方式
#obj=asyncio.get_event_loop()

#takslist=[
#    obj.create_task(func("func1")),
#    obj.create_task(func("func2")),
#]
#done,pending = obj.run_until_complete(asyncio.wait(takslist))
  • Future物件 功能也是用來等待非同步處理的結果的

    task繼承了Future,Task物件內部await的結果的處理是基於Future物件來的.

    但是Future偏向底層,而且我們一般也不會手動去建立,都是通過呼叫task物件進行處理的

    這裡主要提下另一個多(進)執行緒模組下也有個future物件

    asyncio.Future和concurrent.futures.Future

    concurrent.futures.Future 是使用程序池和執行緒池使用到的模組,作用是程序池和執行緒池非同步操作時使用的物件

    一般這2個模組不會有交集,但是如果呼叫的第三方產品 假設Mysql不支援非同步訪問呼叫,那麼用這個Future物件把他們的處理方式模擬成非同步形態進行處理

    下面是如何用協程函式呼叫普通函式,2個要點,

    1. 普通函式外面要包一層協程函式,
    2. 用迴圈器.run_in_executor()來跳開阻塞,用多執行緒執行,第一個引數是用來放程序池(執行緒池的)
    import asyncio
    
    def faa(idx):
        print("第%s個foo開始執行" % idx)
        time.sleep(2)
        return idx
    
    async def faa_faster(idx):
        obj = asyncio.get_event_loop()
        #在建立迭代器時候就用run_in_executor呼叫多(進)執行緒
        ret = obj.run_in_executor(None,faa,idx)
        # print("第%s個foo開始執行" % idx)
        a = await ret
        print(a)
    
    
    task = [faa_faster(i) for i in range(1000)]
    obj = asyncio.get_event_loop()
    obj.run_until_complete(asyncio.wait(task))
    

    示例2.多執行緒執行普通函式,並以協程模式執行

    import asyncio
    from concurrent.futures.thread import ThreadPoolExecutor
    
    def faa(idx):
        print("第%s個foo開始執行" % idx)
        time.sleep(2)
        return idx
    
    
    async def faa_faster(pool,idx):
        obj = asyncio.get_event_loop()
        #第一個引數預設是None,如果傳(進)執行緒池進去,就以多(進)執行緒形式執行普通函式
        ret = obj.run_in_executor(pool,faa,idx)
        # print("第%s個foo開始執行" % idx)
        a = await ret
        print(a)
    
    pool = ThreadPoolExecutor(max_workers=5)
    task = [faa_faster(pool,i) for i in range(1000)]
    obj = asyncio.get_event_loop()
    obj.run_until_complete(asyncio.wait(task))