以Python為例的Async / Await的程式設計基礎
來源:Redislabs
作者:Loris Cro
翻譯:Kevin (公眾號:中介軟體小哥)
近年來,許多程式語言都在努力改進它們的併發原語。Go 語言有 goroutines,Ruby 有 fibers,當然,還有 Node.js 幫助普及的 async/await,這是當今使用最為廣泛的併發操作型別。在本文中,我將以 python 為例討論 async/await 的基礎知識。我選擇python語言,是因為這個功能在python 3中比較新,很多使用者可能對它還不是很熟悉。使用 async/await 的主要原因是通過減少 I/O 執行時的空閒時間來提高程式的吞吐量。使用這個操作符的程式通過隱式地使用一個稱為事件迴圈的抽象來同時處理多個執行路徑。在某些方面,這些事件迴圈類似於多執行緒程式設計,但是事件迴圈通常存在於單個執行緒中,因此,它不能同時執行多個計算。正因為如此,單獨的事件迴圈不能提高計算密集型應用程式的效能。但是,對於進行大量網路通訊的程式,比如連線到Redis資料庫的應用程式,它可以極大地提高效能。每次程式向 Redis 傳送一個命令時,它都會等待 Redis 的響應,如果 Redis 部署在另一臺機器上,就會出現網路延遲。而一個不使用事件迴圈的單執行緒應用程式在等待響應時處於空閒狀態,會佔用大量的CPU週期。需要注意的是,網路延遲是以毫秒為單位的,而 CPU 指令需要納秒來執行,這兩者相差六個數量級。這裡舉個例子,下面的程式碼樣例是用來跟蹤一個遊戲的獲勝排行榜。每個流條目都包含獲勝者的名字,我們的程式會更新一個 Redis 的有序集合(Sorted Set),這個有序集合用來作為排行榜。這裡我們主要關注的是阻塞程式碼和非阻塞程式碼的效能。
1 import redis 2 3 # The operation to perform for each event 4 def add_new_win(conn, winner): 5 conn.zincrby('wins_counter', 1, winner) 6 conn.incr('total_games_played') 7 8 def main(): 9 # Connect to Redis 10 conn = redis.Redis() 11 # Tail the event stream 12 last_id = '$' 13 while True: 14 events = conn.xread({'wins_stream': last_id}, block=0, count=10) 15 # Process each event by calling `add_new_win` 16 for _, e in events: 17 winner = e['winner'] 18 add_new_win(conn, winner) 19 last_id = e['id'] 20 21 if __name__ == '__main__': 22 main()
我們使用aio-libs/aioredis實現與上面程式碼有相同效果的非同步版本。aio-libs 社群正在重寫許多 Python 網路庫,以包括對 asyncio 的支援,asyncio 是 Python 事件迴圈的標準庫實現。下面是上面程式碼的非阻塞版本:
1 import asyncio 2 import aioredis 3 4 async def add_new_win(pool, winner): 5 await pool.zincrby('wins_counter', 1, winner) 6 await pool.incr('total_games_played') 7 8 async def main(): 9 # Connect to Redis 10 pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8') 11 # Tail the event stream 12 last_id = '$' 13 while True: 14 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 15 # Process each event by calling `add_new_win` 16 for _, e_id, e in events: 17 winner = e['winner'] 18 await add_new_win(pool, winner) 19 last_id = e_id 20 21 if __name__ == '__main__': 22 loop = asyncio.get_event_loop() 23 loop.run_until_complete(main())
這段程式碼與上面那段程式碼相比,除了多了一些 await 關鍵字之外,其他的幾乎是相同的。最大的不同之處在最後兩行。在 Node.js 中,環境會預設載入事件迴圈,而在 Python 中,必須顯示地開啟。
重寫之後,我們可能會認為這麼做就可以提高效能了。不幸的是,我們程式碼的非阻塞版本還沒有提高效能。這裡的問題在於我們編寫程式碼的細節,而不僅僅是使用 async / await 的一般思想。
Await 使用的限制
我們重寫程式碼後的主要問題是我們過度使用了 await。當我們在非同步呼叫前面加上 await 時,我們做了以下兩件事:
1. 為執行做相應的排程
2. 等待完成
有時候,這樣做是對的。例如,在完成對第 15 行流的讀取之前,我們不能對每個事件進行迭代。在這種情況下,await 關鍵字是有意義的,但是看看 add_new_win 方法:
1 async def add_new_win(pool, winner): 2 await pool.zincrby('wins_counter', 1, winner) 3 await pool.incr('total_games_played')
在這個函式中,第二個操作並不依賴於第一個操作。我們可以將第二個命令與第一個命令一起傳送,但是當我們傳送第一個命令時,await 將阻塞執行流。我們其實更想要一種能立即執行這兩個操作的方法。為此,我們需要一個不同的同步原語。
1 async def add_new_win(pool, winner): 2 task1 = pool.zincrby('wins_counter', 1, winner) 3 task2 = pool.incr('total_games_played') 4 await asyncio.gather(task1, task2)
首先,呼叫一個非同步函式不會執行其中的任何程式碼,而是會先例項化一個“任務”。根據選擇的語言,這可能被稱為 coroutine, promise 或 future 等等。對我們來說,任務是一個物件,它表示一個值,該值只有在使用了 await 或其他同步原語(如 asyncio.gather)之後才可用。 在 Python 的官方文件中,你可以找到更多關於 asyncio.gather 的資訊。簡而言之,它允許我們在同一時間執行多個任務。我們需要等待它的結果,因為一旦所有的輸入任務完成,它就會建立一個新的任務。Python 的 asyncio.gather 相當於 JavaScript 的 Promise.all,C# 的 Task.WhenAll, Kotlin 的 awaitAll 等等。
改進我們的主迴圈程式碼
我們對 add_new_win 所做的事情也可以用於主流事件處理迴圈。這是我所指的程式碼:
1 last_id = '$' 2 while True: 3 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 4 for _, e_id, e in events: 5 winner = e['winner'] 6 await add_new_win(pool, winner) 7 last_id = e_id
到目前為止,你會注意到我們是順序地處理每個事件。因為在第 6 行中,使用 await 既可以執行又可以等待 add_new_win 的完成。有時這正是你希望發生的情況,因為如果你不按順序執行,程式邏輯就會中斷。在我們的例子中,我們並不真正關心排序,因為我們只是更新計數器。
1 last_id = '$' 2 while True: 3 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 4 tasks = [] 5 for _, e_id, e in events: 6 winner = e['winner'] 7 tasks.append(add_new_win(pool, winner)) 8 last_id = e_id 9 await asyncio.gather(*tasks)
我們現在也在併發地處理每一批事件,並且對程式碼的改動是最小的。最後要記住,有時即使不使用 asyncio.gather,程式也可以是高效能的。特別是,當你為 web 伺服器編寫程式碼並使用像 Sanic 這樣的非同步框架時,該框架將以併發的方式呼叫你的請求處理程式,即使你在等待每個非同步函式呼叫,也能確保巨大的吞吐量。
總結
下面是我們進行上面兩個更改之後的完整程式碼示例:
1 import asyncio 2 import aioredis 3 4 async def add_new_win(pool, winner): 5 # Creating tasks doesn't schedule them 6 # so you can create multiple and then 7 # schedule them all in one go using `gather` 8 task1 = pool.zincrby('wins_counter', 1, winner) 9 task2 = pool.incr('total_games_played') 10 await asyncio.gather(task1, task2) 11 12 async def main(): 13 # Connect to Redis 14 pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8') 15 # Tail the event stream 16 last_id = '$' 17 while True: 18 events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10) 19 tasks = [] 20 for _, e_id, e in events: 21 winner = e['winner'] 22 # Again we don't actually schedule any task, 23 # and instead just prepare them 24 tasks.append(add_new_win(pool, winner)) 25 last_id = e_id 26 # Notice the spread operator (`*tasks`), it 27 # allows using a single list as multiple arguments 28 # to a function call. 29 await asyncio.gather(*tasks) 30 31 if __name__ == '__main__': 32 loop = asyncio.get_event_loop() 33 loop.run_until_complete(main())
為了利用非阻塞 I/O,你需要重新考慮如何處理網路操作。值得高興的是這並不是很困難,你只需要知道順序性什麼時候重要,什麼時候不重要。嘗試使用 aioredis 或等效的非同步 redis 客戶端,看看可以在多大程度上提高應用程式的吞吐量。
更多優質中介軟體技術資訊/原創/翻譯文章/資料/乾貨,請關注“中介軟體小哥”公眾號!<