Python協程
Python協程:從yield/send到async/await
Python由於眾所周知的GIL的原因,導致其線程無法發揮多核的並行計算能力(當然,後來有了multiprocessing,可以實現多進程並行),顯得比較雞肋。既然在GIL之下,同一時刻只能有一個線程在運行,那麽對於CPU密集的程序來說,線程之間的切換開銷就成了拖累,而以I/O為瓶頸的程序正是協程所擅長的:
多任務並發(非並行),每個任務在合適的時候掛起(發起I/O)和恢復(I/O結束)
Python中的協程經歷了很長的一段發展歷程。其大概經歷了如下三個階段:
- 最初的生成器變形yield/send
- 引入@asyncio.coroutine和yield from
- 在最近的Python3.5版本中引入async/await關鍵字
從yield說起
先看一段普通的計算斐波那契續列的代碼:
def old_fib(n): res = [0] * n index = 0 a = 0 b = 1 while index < n: res[index] = b a, b = b, a + b index += 1 return res print(‘-‘*10 + ‘test old fib‘ + ‘-‘*10) for fib_res in old_fib(20): print(fib_res)
如果我們僅僅是需要拿到斐波那契序列的第n位,或者僅僅是希望依此產生斐波那契序列,那麽上面這種傳統方式就會比較耗費內存。
這時,yield就派上用場了。
def fib(n): index = 0 a = 0 b = 1 while index < n: yield b a, b = b, a + b index += 1 print(‘-‘*10 + ‘test yield fib‘ + ‘-‘*10) for fib_res in fib(20): print(fib_res)
當一個函數中包含yield語句時,python會自動將其識別為一個生成器。這時fib(20)並不會真正調用函數體,而是以函數體生成了一個生成器對象實例。
yield在這裏可以保留fib函數的計算現場,暫停fib的計算並將b返回。而將fib放入for…in循環中時,每次循環都會調用next(fib(20)),喚醒生成器,執行到下一個yield語句處,直到拋出StopIteration異常。此異常會被for循環捕獲,導致跳出循環。
Send來了
從上面的程序中可以看到,目前只有數據從fib(20)中通過yield流向外面的for循環;如果可以向fib(20)發送數據,那不是就可以在Python中實現協程了嘛。
於是,Python中的生成器有了send函數,yield表達式也擁有了返回值。
我們用這個特性,模擬一個額慢速斐波那契數列的計算:
def stupid_fib(n): index = 0 a = 0 b = 1 while index < n: sleep_cnt = yield b print(‘let me think {0} secs‘.format(sleep_cnt)) time.sleep(sleep_cnt) a, b = b, a + b index += 1 print(‘-‘*10 + ‘test yield send‘ + ‘-‘*10) N = 20 sfib = stupid_fib(N) fib_res = next(sfib) while True: print(fib_res) try: fib_res = sfib.send(random.uniform(0, 0.5)) except StopIteration: break
其中next(sfib)相當於sfib.send(None),可以使得sfib運行至第一個yield處返回。後續的sfib.send(random.uniform(0, 0.5))則將一個隨機的秒數發送給sfib,作為當前中斷的yield表達式的返回值。這樣,我們可以從“主”程序中控制協程計算斐波那契數列時的思考時間,協程可以返回給“主”程序計算結果,Perfect!
yield from是個什麽鬼?
yield from用於重構生成器,簡單的,可以這麽使用:
def copy_fib(n): print(‘I am copy from fib‘) yield from fib(n) print(‘Copy end‘) print(‘-‘*10 + ‘test yield from‘ + ‘-‘*10) for fib_res in copy_fib(20): print(fib_res)
這種使用方式很簡單,但遠遠不是yield from的全部。yield from的作用還體現可以像一個管道一樣將send信息傳遞給內層協程,並且處理好了各種異常情況,因此,對於stupid_fib也可以這樣包裝和使用:
def copy_stupid_fib(n): print(‘I am copy from stupid fib‘) yield from stupid_fib(n) print(‘Copy end‘) print(‘-‘*10 + ‘test yield from and send‘ + ‘-‘*10) N = 20 csfib = copy_stupid_fib(N) fib_res = next(csfib) while True: print(fib_res) try: fib_res = csfib.send(random.uniform(0, 0.5)) except StopIteration: break
如果沒有yield from,這裏的copy_yield_from將會特別復雜(因為要自己處理各種異常)。
asyncio.coroutine和yield from
yield from在asyncio模塊中得以發揚光大。先看示例代碼:
@asyncio.coroutine def smart_fib(n): index = 0 a = 0 b = 1 while index < n: sleep_secs = random.uniform(0, 0.2) yield from asyncio.sleep(sleep_secs) print(‘Smart one think {} secs to get {}‘.format(sleep_secs, b)) a, b = b, a + b index += 1 @asyncio.coroutine def stupid_fib(n): index = 0 a = 0 b = 1 while index < n: sleep_secs = random.uniform(0, 0.4) yield from asyncio.sleep(sleep_secs) print(‘Stupid one think {} secs to get {}‘.format(sleep_secs, b)) a, b = b, a + b index += 1 if __name__ == ‘__main__‘: loop = asyncio.get_event_loop() tasks = [ asyncio.async(smart_fib(10)), asyncio.async(stupid_fib(10)), ] loop.run_until_complete(asyncio.wait(tasks)) print(‘All fib finished.‘) loop.close()
asyncio是一個基於事件循環的實現異步I/O的模塊。通過yield from,我們可以將協程asyncio.sleep的控制權交給事件循環,然後掛起當前協程;之後,由事件循環決定何時喚醒asyncio.sleep,接著向後執行代碼。
這樣說可能比較抽象,好在asyncio是一個由python實現的模塊,那麽我們來看看asyncio.sleep中都做了些什麽:
@coroutine def sleep(delay, result=None, *, loop=None): """Coroutine that completes after a given time (in seconds).""" future = futures.Future(loop=loop) h = future._loop.call_later(delay, future._set_result_unless_cancelled, result) try: return (yield from future) finally: h.cancel()
首先,sleep創建了一個Future對象,作為更內層的協程對象,通過yield from交給了事件循環;其次,它通過調用事件循環的call_later函數,註冊了一個回調函數。
通過查看Future類的源碼,可以看到,Future是一個實現了__iter__對象的生成器:
class Future: #blabla... def __iter__(self): if not self.done(): self._blocking = True yield self # This tells Task to wait for completion. assert self.done(), "yield from wasn‘t used with future" return self.result() # May raise too.
那麽當我們的協程yield from asyncio.sleep時,事件循環其實是與Future對象建立了練習。每次事件循環調用send(None)時,其實都會傳遞到Future對象的__iter__函數調用;而當Future尚未執行完畢的時候,就會yield self,也就意味著暫時掛起,等待下一次send(None)的喚醒。
當我們包裝一個Future對象產生一個Task對象時,在Task對象初始化中,就會調用Future的send(None),並且為Future設置好回調函數。
class Task(futures.Future): #blabla... def _step(self, value=None, exc=None): #blabla... try: if exc is not None: result = coro.throw(exc) elif value is not None: result = coro.send(value) else: result = next(coro) #exception handle else: if isinstance(result, futures.Future): # Yielded Future must come from Future.__iter__(). if result._blocking: result._blocking = False result.add_done_callback(self._wakeup) #blabla... def _wakeup(self, future): try: value = future.result() except Exception as exc: # This may also be a cancellation. self._step(None, exc) else: self._step(value, None) self = None # Needed to break cycles when an exception occurs
預設的時間過後,事件循環將調用Future._set_result_unless_cancelled:
class Future: #blabla... def _set_result_unless_cancelled(self, result): """Helper setting the result only if the future was not cancelled.""" if self.cancelled(): return self.set_result(result) def set_result(self, result): """Mark the future done and set its result. If the future is already done when this method is called, raises InvalidStateError. """ if self._state != _PENDING: raise InvalidStateError(‘{}: {!r}‘.format(self._state, self)) self._result = result self._state = _FINISHED self._schedule_callbacks()
這將改變Future的狀態,同時回調之前設定好的Tasks._wakeup;在_wakeup中,將會再次調用Tasks._step,這時,Future的狀態已經標記為完成,因此,將不再yield self,而return語句將會觸發一個StopIteration異常,此異常將會被Task._step捕獲用於設置Task的結果。同時,整個yield from鏈條也將被喚醒,協程將繼續往下執行。
async和await
弄清楚了asyncio.coroutine和yield from之後,在Python3.5中引入的async和await就不難理解了:可以將他們理解成asyncio.coroutine/yield from的完美替身。當然,從Python設計的角度來說,async/await讓協程表面上獨立於生成器而存在,將細節都隱藏於asyncio模塊之下,語法更清晰明了。
async def smart_fib(n): index = 0 a = 0 b = 1 while index < n: sleep_secs = random.uniform(0, 0.2) await asyncio.sleep(sleep_secs) print(‘Smart one think {} secs to get {}‘.format(sleep_secs, b)) a, b = b, a + b index += 1 async def stupid_fib(n): index = 0 a = 0 b = 1 while index < n: sleep_secs = random.uniform(0, 0.4) await asyncio.sleep(sleep_secs) print(‘Stupid one think {} secs to get {}‘.format(sleep_secs, b)) a, b = b, a + b index += 1 if __name__ == ‘__main__‘: loop = asyncio.get_event_loop() tasks = [ asyncio.ensure_future(smart_fib(10)), asyncio.ensure_future(stupid_fib(10)), ] loop.run_until_complete(asyncio.wait(tasks)) print(‘All fib finished.‘) loop.close()
總結
至此,Python中的協程就介紹完畢了。示例程序中都是以sleep為異步I/O的代表,在實際項目中,可以使用協程異步的讀寫網絡、讀寫文件、渲染界面等,而在等待協程完成的同時,CPU還可以進行其他的計算。協程的作用正在於此。
Python協程