1. 程式人生 > 其它 >重新談非同步程式設計-協程

重新談非同步程式設計-協程

首先來說什麼是協程?

協程又被稱之為是微執行緒,或者說是在一個執行緒內實現程式碼塊的相互切換執行。

在《計算機作業系統》中我們學過,一個程序中包含若干個執行緒,一個執行緒中可以包含若干個程序。在Python中,一個執行緒又包含若干個協程。CPU如果在程序和程序之間切換,開銷是比較大的,相對來講,同一程序下的執行緒切換開銷要小很多,因為同一程序下的執行緒是共享該程序的資源的。同理,同一執行緒下的協程切換,也要比執行緒切換開銷小。

協程不像執行緒一樣,需要建立執行緒例項化物件,協程是通過協程函式+將協程調入事件迴圈實現的。

協程函式的定義關鍵詞是asyncio

例如:

import asyncio
async 
def fun(): pass

這就是一個協程函式的定義,asyncio模組是內建的,不用額外下載安裝。但是協程函式不能像普通函式一樣,“fun()”,可以執行,而是需要將協程函式註冊到事件迴圈當中,並且協程函式中必須有“await+可等待物件(協程物件、Future物件、Task物件)”。

例如:

import asyncio
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
loop = asyncio.get_event_loop()
loop.run_until_complete(fun())

其中,asyncio.get_event_loop()是建立事件迴圈物件,loop.run_until_complete(fun())是將協程函式fun()註冊到事件迴圈當中,並執行。結果是沒有問題的,輸出'hello',等待2s,在輸出'word‘。

當中的asyncio.sleep(2),是模擬IO工作2s,await是將這個操作掛起2s中,或者說這個操作被阻塞2s的時間。這裡的asyncio.sleep(2)不能寫成time.sleep(2),兩個雖然都是等待2s,但是實質是不一樣的。

再py3.7之後,我們可以將

loop = asyncio.get_event_loop()

loop.run_until_complete(fun())

合併為:

asyncio.run(fun())

============================================================================================================================

現在,我們在建立一個新的協程函式。

import asyncio
import time
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
async def fun1():
    print('how are you?')
    await asyncio.sleep(2)
    print('fine')
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(fun())
loop.run_until_complete(fun1())
end_time = time.time()
print('耗時',end_time-start_time)

輸出:

hello
word
how are you?
fine
耗時 4.002251625061035

我們發現,雖然是將兩個協程函式分別註冊到了時間迴圈當中,但是並沒有像我們想象中一樣,交替執行,而是仍然序列執行,消耗4s。我們希望是fun協程先執行print('hello'),遇到了一個2s的IO操作,馬上切換到協程fun1的print('how are you?'),又碰到IO操作,繼續將IO操作掛起。經過不到2s的時間後協程fun的IO已經結束了,繼續執行fun的print('word'),再執行fun1的print('fine')。為什麼是這樣?我個人猜測和理解,並沒有證明,loop.run_until_complete(fun())、loop.run_until_complete(fun1())是先後註冊到協程中的,fun()結束之後,才執行fun1()。如果有其他解釋歡迎留言。這裡我們還要正確理解await後邊的IO操作。IO操作是可以和CPU並行進行的,也就是說在同一時刻,既可以CPU計算,又可以IO工作。如果說在IO的時候,CPU在等待,很明顯浪費時間,所以我們希望有IO和CPU並行進行,也就是有IO的協程之間併發。

============================================================================================================================

那我們如何將兩個協程併發起來呢?依照上邊的思想,我們希望將兩個協程同時註冊到事件迴圈當中。

例:

import asyncio
import time
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
async def fun1():
    print('how are you?')
    await asyncio.sleep(2)
    print('fine')
start_time = time.time()
task1 = asyncio.ensure_future(fun())
task2 = asyncio.ensure_future(fun1())
task_list = [task1,task2]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
end_time = time.time()
print('耗時',end_time-start_time)

輸出:

hello
how are you?
word
fine
耗時 2.0022101402282715

我們發現,這是我們需要的。程式碼中的asyncio.ensure_future()用來建立task物件,task_list是一個列表,列表裡邊每個元素是task物件。

我們列印一下asyncio.wait(task_list),輸出:<coroutine object wait at 0x000001E8C9B523C8>,發現這是一個協程物件(注意不是協程函式)。

這裡有同學說了,我們將協程函式轉成協程物件不就可以了?print(fun()),發現fun()確實是協程物件,但是會報警告,“Enable tracemalloc to get the object allocation traceback”。

所以在這裡,實際上是將協程物件先轉成了task物件,再將task物件組成的列表傳入到loop.run_until_complete()裡。

所以到此為止我們知道了,loop.run_until_complete()這個方法的引數必須是一個協程物件。

我們可以將asyncio.wait(task_list)改為,res = asyncio.gather(*task_list)也可以,wait方法要求傳入的是一個列表,gather方法要求傳入的引數是一個列表解包後的結果。所以也可以這樣寫:asyncio.gather(task1,task2)。

import asyncio
import time
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
async def fun1():
    print('how are you?')
    await asyncio.sleep(2)
    print('fine')

start_time = time.time()
task1 = asyncio.ensure_future(fun())
task2 = asyncio.ensure_future(fun1())
task_list = [task1,task2]
loop = asyncio.get_event_loop()
res = asyncio.gather(task1,task2)
print(res)
loop.run_until_complete(res)
end_time = time.time()
print('耗時',end_time-start_time)

輸出:

<_GatheringFuture pending>
hello
how are you?
word
fine
耗時 2.0014095306396484

這裡我們發現小細節,asyncio.gather(task1,task2)的返回值並不是前邊說的task類物件,而是一個Future類物件,這是因為task是Future的子類,這下就都明白了吧?pending的意思是未開始,列印的時候還沒有開始run呢。所以說gather比wait更高一級。

那麼task物件到底有什麼用?task物件是把協程物件進行封裝,可以追蹤 coroutine協程物件的完成狀態。也就是說儲存了協程執行後的狀態,用於未來獲取協程的結果。

到此,我們知道了loop.run_until_complete()的引數可以是Future物件,也可以是協程物件。回想第一個例子,我們把fun()傳入到這裡不就可以解釋了嗎?

============================================================================================================================

現在,我們來說說ensure_future和create_task區別。

兩個方法都是來建立task物件,但是如果把上邊的ensure_future修改為create_task會報錯。為啥?

注意:如果當前執行緒中沒有正在執行的事件迴圈,asyncio.create_task將會引發RuntimeError異常。可以理解為asyncio.create_task必須寫在一個協程函式中。

將上面的程式碼略微修改就可以:

import asyncio
import time
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
async def fun1():
    print('how are you?')
    await asyncio.sleep(2)
    print('fine')
async def main():
    start_time = time.time()
    task1 = asyncio.create_task(fun())
    task2 = asyncio.ensure_future(fun1())
    await asyncio.gather(task1,task2)
    end_time = time.time()
    print('耗時',end_time-start_time)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

輸出:

hello
how are you?
word
fine
耗時 2.000105619430542

在main()協程函式中,await是必須要有的,如果沒有await,這就不是一個正確的協程函式,會報錯。後邊的asyncio.gather(task1,task2)前邊介紹過,返回的是一個Future物件,是沒問題的。我是從實用角度解釋的ensure_future和create_task區別,並不全面,後續碰到實際問題再實際分析。

============================================================================================================================

剛才說的到的asyncio.wait 和 asyncio.gather,有什麼區別呢?

前者的返回值是兩個集合結果,第1個集合是已經完成的task,第2個集合是未完成的task。後者直接返回的是多個task的計算結果。

例:

import asyncio
import time
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
async def fun1():
    print('how are you?')
    await asyncio.sleep(2)
    print('fine')
async def main():
    start_time = time.time()
    task1 = asyncio.create_task(fun())
    task2 = asyncio.ensure_future(fun1())
    res1,res2 = await asyncio.wait([task1,task2])
    print(res1)
    print('======')
    print(res2)
    end_time = time.time()
    print('耗時',end_time-start_time)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

輸出:

hello
how are you?
word
fine
{<Task finished coro=<fun() done, defined at C:/Users/王棟軒/PycharmProjects/pythonProject1/main.py:98> result=None>, <Task finished coro=<fun1() done, defined at C:/Users/王棟軒/PycharmProjects/pythonProject1/main.py:102> result=None>}
======
set()
耗時 2.000847101211548

程序已結束,退出程式碼0

很容看到,返回值的第1個是2個協程物件,第2個是空。我們可以使用task.result來獲取兩個task物件(兩個task物件是由前邊的兩個協程函式封裝好的)返回值。

例:

import asyncio
import time
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
    return 'fun1'
async def fun1():
    print('how are you?')
    await asyncio.sleep(2)
    print('fine')
    return 'fun2'
async def main():
    start_time = time.time()
    task1 = asyncio.create_task(fun())
    task2 = asyncio.ensure_future(fun1())
    res1,res2 = await asyncio.wait([task1,task2])
    for i in res1:
        print(i.result())
    end_time = time.time()
    print('耗時',end_time-start_time)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

輸出:

hello
how are you?
word
fine
<class '_asyncio.Task'> fun2
<class '_asyncio.Task'> fun1
耗時 2.0008704662323

程序已結束,退出程式碼0

例:

import asyncio
import time
async def fun():
    print('hello')
    await asyncio.sleep(2)
    print('word')
    return 'fun1'
async def fun1():
    print('how are you?')
    await asyncio.sleep(2)
    print('fine')
    return 'fun2'
async def main():
    start_time = time.time()
    task1 = asyncio.create_task(fun())
    task2 = asyncio.ensure_future(fun1())
    res1,res2 = await asyncio.gather(*[task1,task2])
    print(res1,res2)
    end_time = time.time()
    print('耗時',end_time-start_time)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

輸出:

hello
how are you?
word
fine
fun1 fun2
耗時 2.000060558319092

可以看到,程式碼中用res1和res2分別接收task1和task2的返回結果。

ok。到此,協程的又一步學習告一段落。

耗時1個 晚上+一個1下午。

ending……