1. 程式人生 > >asyncio------使用協程處理併發問題

asyncio------使用協程處理併發問題

如果有 100 個 socket,那麼給每個 socket 分別建立一個 thread 來處理,現在的計算機應該都能 hold 住。但是當 socket 數量更高,併發量更大的時候,那麼就應該選擇使用 asyncio 了。

轉載自: http://python.jobbole.com/87310/

額外參考:https://segmentfault.com/a/1190000008814676

     run_until_complete 是一個阻塞(blocking)呼叫,直到協程執行結束,它才返回

        gather 起聚合的作用,把多個 futures 包裝成單個 future,因為 loop.run_until_complete

只接受單個 future

        建議呼叫 loop.close,以徹底清理 loop 物件防止誤用,

   loop.run_until_complete(do_some_work(loop, 1))
   loop.close()

Python黑魔法 --- 非同步IO( asyncio) 協程

python asyncio

網路模型有很多中,為了實現高併發也有很多方案,多執行緒,多程序。無論多執行緒和多程序,IO的排程更多取決於系統,而協程的方式,排程來自使用者,使用者可以在函式中yield一個狀態。使用協程可以實現高效的併發任務。Python的在3.4中引入了協程的概念,可是這個還是以生成器物件為基礎,3.5則確定了協程的語法。下面將簡單介紹asyncio的使用。實現協程的不僅僅是asyncio,tornado和gevent都實現了類似的功能。

  • event_loop 事件迴圈:程式開啟一個無限的迴圈,程式設計師會把一些函式註冊到事件迴圈上。當滿足事件發生的時候,呼叫相應的協程函式。
  • coroutine 協程:協程物件,指一個使用async關鍵字定義的函式,它的呼叫不會立即執行函式,而是會返回一個協程物件。協程物件需要註冊到事件迴圈,由事件迴圈呼叫。
  • task 任務:一個協程物件就是一個原生可以掛起的函式,任務則是對協程進一步封裝,其中包含任務的各種狀態。
  • future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別
  • async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的非同步呼叫介面。

上述的概念單獨拎出來都不好懂,比較他們之間是相互聯絡,一起工作。下面看例子,再回溯上述概念,更利於理解。

定義一個協程

定義一個協程很簡單,使用async關鍵字,就像定義普通函式一樣:

import timeimport asyncionow = lambda : time.time()async def do_some_work(x): print('Waiting: ', x)start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()loop.run_until_complete(coroutine)print('TIME: ', now() - start)
12345678910111213141516import timeimport asyncionow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()loop.run_until_complete(coroutine)print('TIME: ',now()-start)

通過async關鍵字定義一個協程(coroutine),協程也是一種物件。協程不能直接執行,需要把協程加入到事件迴圈(loop),由後者在適當的時候呼叫協程。asyncio.get_event_loop方法可以建立一個事件迴圈,然後使用run_until_complete將協程註冊到事件迴圈,並啟動事件迴圈。因為本例只有一個協程,於是可以看見如下輸出:

Waiting: 2TIME: 0.0004658699035644531
12Waiting:2TIME:0.0004658699035644531

建立一個task

協程物件不能直接執行,在註冊事件迴圈的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)物件。所謂task物件是Future類的子類。儲存了協程執行後的狀態,用於未來獲取協程的結果。

import asyncioimport timenow = lambda : time.time()async def do_some_work(x): print('Waiting: ', x)start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()# task = asyncio.ensure_future(coroutine)task = loop.create_task(coroutine)print(task)loop.run_until_complete(task)print(task)print('TIME: ', now() - start)
123456789101112131415161718import asyncioimport timenow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()# task = asyncio.ensure_future(coroutine)task=loop.create_task(coroutine)print(task)loop.run_until_complete(task)print(task)print('TIME: ',now()-start)

可以看到輸出結果為:

Python<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:17>>Waiting: 2<Task finished coro=<do_some_work() done, defined at /Users/ghost/Rsj217/python3.6/async/async-main.py:17> result=None>TIME: 0.0003490447998046875
1234<Task pending coro=<do_some_work()running at/Users/ghost/Rsj217/python3.6/async/async-main.py:17>>Waiting:2<Task finished coro=<do_some_work()done,defined at/Users/ghost/Rsj217/python3.6/async/async-main.py:17>result=None>TIME:0.0003490447998046875

建立task後,task在加入事件迴圈之前是pending狀態,因為do_some_work中沒有耗時的阻塞操作,task很快就執行完畢了。後面列印的finished狀態。

asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以建立一個task,run_until_complete的引數是一個futrue物件。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。isinstance(task, asyncio.Future)將會輸出True。

繫結回撥

繫結回撥,在task執行完畢的時候可以獲取執行的結果,回撥的最後一個引數是future物件,通過該物件可以獲取協程返回值。如果回撥需要多個引數,可以通過偏函式匯入。

import timeimport asyncionow = lambda : time.time()async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x)def callback(future): print('Callback: ', future.result())start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)task.add_done_callback(callback)loop.run_until_complete(task)print('TIME: ', now() - start)
123456789101112131415161718192021import timeimport asyncionow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)return'Done after {}s'.format(x)def callback(future):print('Callback: ',future.result())start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()task=asyncio.ensure_future(coroutine)task.add_done_callback(callback)loop.run_until_complete(task)print('TIME: ',now()-start)
def callback(t, future): print('Callback:', t, future.result())task.add_done_callback(functools.partial(callback, 2))
1234def callback(t,future):print('Callback:',t,future.result())task.add_done_callback(functools.partial(callback,2))

可以看到,coroutine執行結束時候會呼叫回撥函式。並通過引數future獲取協程執行的結果。我們建立的task和回撥裡的future物件,實際上是同一個物件。

future 與 result

回撥一直是很多非同步程式設計的惡夢,程式設計師更喜歡使用同步的編寫方式寫非同步程式碼,以避免回撥的惡夢。回撥中我們使用了future物件的result方法。前面不繫結回撥的例子中,我們可以看到task有fiinished狀態。在那個時候,可以直接讀取task的result方法。

async def do_some_work(x): print('Waiting {}'.format(x)) return 'Done after {}s'.format(x)start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)loop.run_until_complete(task)print('Task ret: {}'.format(task.result()))print('TIME: {}'.format(now() - start))
12345678910111213async def do_some_work(x):print('Waiting {}'.format(x))return'Done after {}s'.format(x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()task=asyncio.ensure_future(coroutine)loop.run_until_complete(task)print('Task ret: {}'.format(task.result()))print('TIME: {}'.format(now()-start))

可以看到輸出的結果:

Waiting: 2Task ret: Done after 2sTIME: 0.0003650188446044922
123Waiting:2Task ret:Done after2sTIME:0.0003650188446044922

阻塞和await

使用async可以定義協程物件,使用await可以針對耗時的操作進行掛起,就像生成器裡的yield一樣,函式讓出控制權。協程遇到await,事件迴圈將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行。

耗時的操作一般是一些IO操作,例如網路請求,檔案讀取等。我們使用asyncio.sleep函式來模擬IO操作。協程的目的也是讓這些IO操作非同步化。

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x)start = now()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)loop.run_until_complete(task)print('Task ret: ', task.result())print('TIME: ', now() - start)
12345678910111213141516171819import asyncioimport timenow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)await asyncio.sleep(x)return'Done after {}s'.format(x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()task=asyncio.ensure_future(coroutine)loop.run_until_complete(task)print('Task ret: ',task.result())print('TIME: ',now()-start)

在 sleep的時候,使用await讓出控制權。即當遇到阻塞呼叫的函式的時候,使用await方法將協程的控制權讓出,以便loop呼叫其他的協程。現在我們的例子就用耗時的阻塞操作了。

併發和並行

併發和並行一直是容易混淆的概念。併發通常指有多個任務需要同時進行,並行則是同一時刻有多個任務執行。用上課來舉例就是,併發情況下是一個老師在同一時間段輔助不同的人功課。並行則是好幾個老師分別同時輔助多個學生功課。簡而言之就是一個人同時吃三個饅頭還是三個人同時分別吃一個的情況,吃一個饅頭算一個任務。

asyncio實現併發,就需要多個協程來完成任務,每當有任務阻塞的時候就await,然後其他協程繼續工作。建立多個協程的列表,然後將這些協程註冊到事件迴圈中。

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x)start = now()coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3)]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))for task in tasks: print('Task ret: ', task.result())print('TIME: ', now() - start)
12345678910111213141516171819202122232425262728293031import asyncioimport timenow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)await asyncio.sleep(x)return'Done after {}s'.format(x)start=now()coroutine1=do_some_work(1)coroutine2=do_some_work(2)coroutine3=do_some_work(4)tasks=[asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))fortask intasks:print('Task ret: ',task.result())print('TIME: ',now()-start)

結果如下

Waiting: 1Waiting: 2Waiting: 4Task ret: Done after 1sTask ret: Done after 2sTask ret: Done after 4sTIME: 4.003541946411133
1234567Waiting:1Waiting:2Waiting:4Task ret:Done after1sTask ret:Done after2sTask ret:Done after4sTIME:4.003541946411133

總時間為4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。如果是同步順序的任務,那麼至少需要7s。此時我們使用了aysncio實現了併發。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個task列表,後者接收一堆task。

協程巢狀

使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了巢狀的協程,即一個協程中await了另外一個協程,如此連線起來。

import asyncioimport timenow = lambda: time.time()async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x)async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result())start = now()loop = asyncio.get_event_loop()loop.run_until_complete(main())print('TIME: ', now() - start)
12345678910111213141516171819202122232425262728293031323334import asyncioimport timenow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)await asyncio.sleep(x)return'Done after {}s'.format(x)async def main():coroutine1=do_some_work(1)coroutine2=do_some_work(2)coroutine3=do_some_work(4)tasks=[asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]dones,pendings=await asyncio.wait(tasks)fortask indones:print('Task ret: ',task.result())start=now()loop=asyncio.get_event_loop()loop.run_until_complete(main())print('TIME: ',now()-start)

如果使用的是 asyncio.gather建立協程物件,那麼await的返回值就是協程執行的結果。

results = await asyncio.gather(*tasks) for result in results: print('Task ret: ', result)
1234results=await asyncio.gather(*tasks)forresult inresults:print('Task ret: ',result)

不在main協程函式裡處理結果,直接返回await的內容,那麼最外層的run_until_complete將會返回main協程的結果。

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(2) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] return await asyncio.gather(*tasks)start = now()loop = asyncio.get_event_loop()results = loop.run_until_complete(main())for result in results: print('Task ret: ', result)
1234567891011121314151617181920async def main():coroutine1=do_some_work(1)coroutine2=do_some_work(2)coroutine3=do_some_work(2)tasks=[asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]returnawait asyncio.gather(*tasks)start=now()loop=asyncio.get_event_loop()results=loop.run_until_complete(main())forresult inresults:print('Task ret: ',result)

或者返回使用asyncio.wait方式掛起協程。

相關推薦

asyncio------使用處理併發問題

如果有 100 個 socket,那麼給每個 socket 分別建立一個 thread 來處理,現在的計算機應該都能 hold 住。但是當 socket 數量更高,併發量更大的時候,那麼就應該選擇使用 asyncio 了。轉載自: http://python.jobbole.

asyncio並發

orm python \n eat 必須 done await work tor # # Generator with yield # astr = ‘ABC‘ alist = [1, 2, 3] adict = dict(name=‘kct‘, age=18) a

python 64式: 第4式、eventlet實現併發

#!/usr/bin/env python # -*- coding: utf-8 -*- from datetime import datetime import eventlet eventlet.monkey_patch(all=True) from eventlet.green

多程序+ 處理IO問題

from multiprocessing import Pool import gevent,os import time def recursion(n): if n == 1 or n ==2: return 1 else: re

多進+ 處理IO問題

運算 class tip async pen int multi () lse from multiprocessing import Pool import gevent,os import time def recursion(n): if

PHP併發 shell_exec

在PHP程式中經常需要用shell_exec執行一些命令,而普通的shell_exec是阻塞的,如果命令執行時間過長,那可能會導致程序完全卡住。在Swoole4協程環境下可以用Co::exec併發地執行很多命令。 本文基於 Swoole-4.2.9和 PHP-7.2.9版本 協程示例 &

update 裡面的處理

// Update is called once per framevoid Update () {         if(Input.GetKeyDown(KeyCode.Space))             StartCoroutine(test());}     I

實現併發下載

    在單執行緒的程式中,採取的是順序執行方式。對於下載程式來說,單執行緒的效率是極其低的,原因是它只能在下載完一個檔案後才可以讀取該檔案。當接收一個遠端檔案時,程式將大部分時間花費在等待資料接收上。更明確地說,將時間用在了對receive阻塞呼叫上。因此,如果一個程式可以

理解Go併發

協程 Go語言裡建立一個協程很簡單,使用go關鍵字就可以讓一個普通方法協程化: package main import ( "fmt" "time" ) func main(){ fmt.Println("run in main c

python教程:使用 async 和 await 進行併發程式設計

python 一直在進行併發程式設計的優化, 比較熟知的是使用 thread 模組多執行緒和 multiprocessing 多程序,後來慢慢引入基於 yield 關鍵字的協程。 而近幾個版本,python 對於協程的寫法進行了大幅的優化,很多之前的協程寫法不被官方推薦了。如果你之前瞭解過 python 協程

python--asyncio模組(基礎併發測試)

在高併發的場景下,python提供了一個多執行緒的模組threading,但似乎這個模組並不近人如意,原因在於cpython本身的全域性解析鎖(GIL)問題,在一段時間片內實際上的執行是單執行緒的。同時還存在著資源爭奪的問題。python3.4之後引入了基於生成器物件的協程概念。也就是asyncio模組。除了

Python實戰之(greenlet模組,gevent模組,socket+ gevent實現高併發處理

協程 協程,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒。(cpu不知道,是使用者自己控制的) 協程擁有自己的暫存器上下文和棧。協程排程切換時,將暫存器上下文和棧儲存到其他地方,在切回來的時候,恢復先前儲存的暫存器上下文和棧(執行緒的

Goroutine()為何能處理併發

簡單來說:協程十分輕量,可以在一個程序中執行有數以十萬計的協程,依舊保持高效能。 程序、執行緒、協程的關係和區別: 程序擁有自己獨立的堆和棧,既不共享堆,亦不共享棧,程序由作業系統排程。 執行緒擁有自己獨立的棧和共享的堆,共享堆,不共享棧,執行緒亦由作業系統排程(標準執行緒是的)。 協程和執行緒一

處理多任務線對比

print har src super __main__ turn python 線程 eve 線程版處理多任務: #!/usr/bin/env python # -*- coding:utf-8 -*- import threading import iter

Pythonasyncio

asyncio(解決非同步io程式設計的一整套解決方案,它主要用於非同步網路操作、併發和協程)協程(Coroutine一種使用者態的輕量級微執行緒,它是程式級別的,在執行過程中可以中斷去執行其它的子程式,別的子程式也可以中斷回來繼續執行之前的子程式,無需執行緒上下文切換的開銷) get_event_loop

500 Lines or Less | A Web Crawler With asyncio Coroutines:用寫web爬蟲

1 def fetch(url): 2 sock = socket.socket() 3 sock.connect(('xkcd.com', 80)) 4 request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.form

python3併發

# 程序是資源分配的單位 # 執行緒是作業系統排程的單位 # 程序切換需要的資源最大,效率低 # 執行緒切換需要的資源一般,效率一般 # 協程切換任務資源很小,效率高 # 多程序、多執行緒根據cpu核數不一樣可能是並行的,但是協成在一個執行緒中 #協程,自動切換 import gevent,time

asyncio 異步

pla ret mage print .get imp pen lee com 並發執行任務示例: 1 import asyncio, time 2 3 #異步協程 4 async def hello(): 5 """ 6 這邊程序運行時,

15.python併發程式設計(執行緒--程序--)

一.程序:1.定義:程序最小的資源單位,本質就是一個程式在一個數據集上的一次動態執行(執行)的過程2.組成:程序一般由程式,資料集,程序控制三部分組成:(1)程式:用來描述程序要完成哪些功能以及如何完成(2)資料集:是程式在執行過程中所需要使用的一切資源(3)程序控制塊:用來記錄程序外部特徵,描述程序的執行變

python3多程序 程序池 併發

一、程序            我們電腦的應用程式,都是程序,程序是資源分配的單位。程序切換需要的資源最大,效率低。         程序之間相互獨立