深入tornado中的協程
tornado使用了單進程(當然也可以多進程) + 協程 + I/O多路復用的機制,解決了C10K中因為過多的線程(進程)的上下文切換 而導致的cpu資源的浪費。
tornado中的I/O多路復用前面已經講過了。本文不做詳細解釋。
來看一下tornado中的協程模塊:tornado.gen:
tornado.gen是根據生成器(generator)實現的,用來更加簡單的實現異步。
先來說一下tornado.gen.coroutine的實現思路:
我們知道generator中的yield語句可以使函數暫停執行,而send()方法則可以恢復函數的執行。
tornado將那些異步操作放置到yield語句後,當這些異步操作完成後,tornado會將結果send()至generator中恢復函數執行。
在tornado的官方文檔中有這麽一句話:
Most asynchronous functions in Tornado return a Future; yielding this object returns its result.
就是說:在tornado中大多數的異步操作返回一個Future對象,yield Future對象 會返回該異步操作的結果。
那麽,Future對象到底是什麽?
一 Future對象
先來說說Future對象:
Future對象可以概括為: 一個異步操作的占位符,當然這個占位符有些特殊,它特殊在:
1 這個占位符是一個對象
2 這個對象包含了很多屬性,包括_result 以及 _callbacks,分別用來存儲異步操作的結果以及回調函數
3 這個對象包含了很多方法,比如添加回調函數,設置異步操作結果等。
4 當這個對象對應的異步操作完成後,該對象會被set_done,然後遍歷並運行_callbacks中的回調函數
來看一下Future的簡化版:
class Future(object): ‘‘‘ Future對象主要保存一個回調函數列表_callbacks與一個執行結果_result,當我們set_result時,就會執行_callbacks中的函數 如果set_result或者set_done,就會遍歷_callbacks列表並執行callback(self)函數 ‘‘‘ def __init__(self): self._result = None # 執行的結果 self._callbacks = [] # 用來保存該future對象的回調函數 def result(self, timeout=None): # 如果操作成功,返回結果。如果失敗則拋出異常 self._clear_tb_log() if self._result is not None: return self._result if self._exc_info is not None: raise_exc_info(self._exc_info) self._check_done() return self._result def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def _set_done(self): # 執行結束(成功)後的操作。 self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception(‘Exception in callback %r for %r‘, cb, self) self._callbacks = None
完整源碼:
Future源碼
二 gen.coroutine裝飾器
tornado中的協程是通過tornado.gen中的coroutine裝飾器實現的:
def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True)
_make_coroutine_wrapper :
def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): ‘‘‘ 大體過程: future = TracebackFuture() result = func(*args, **kwargs) if isinstance(result, GeneratorType): yielded = next(result) Runner(result, future, yielded) return future ‘‘‘ future = TracebackFuture() # TracebackFuture = Future if replace_callback and ‘callback‘ in kwargs: callback = kwargs.pop(‘callback‘) IOLoop.current().add_future(future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) # 執行func,若func中包含yield,則返回一個generator對象 except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, GeneratorType): # 判斷其是否為generator對象 try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) # 第一次執行 if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( ‘stack_context inconsistency (probably caused ‘ ‘by yield within a "with StackContext" block)‘)) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: future.set_exc_info(sys.exc_info()) else: Runner(result, future, yielded) # Runner(result, future, yield) try: return future finally: future = None future.set_result(result) return future return wrapper
先來看一下大體過程:
1 首先生成一個Future對象
2 運行該被裝飾函數並將結果賦值給result。 在這裏因為tornado的‘異步‘實現是基於generator的,所以一般情況下 result是一個generator對象
3 yielded = next(result) 執行到被裝飾函數的第一次yield,將結果賦值給yielded。一般情況下,yielded很大情況下是一個Future對象。
4 Runner(result, future, yielded)
5 return future
除了第4步以外其他都很好理解,所以來了解一下第四步Runner()幹了些啥:
三 Runner()類
1 為什麽要有Runner()?或者說Runner()的作用是什麽?
Runner()可以自動的將異步操作的結果send()至生成器中止的地方
tornado的協程或者說異步是基於generator實現的,generator較為常用的有兩個方法:send() next() ,關於這兩個方法的流程分析在這。
很多情況下會有generator的嵌套。比如說經常會yield 一個generator。當A生成器yield B生成器時,分兩步:
1 我們首先中止A的執行轉而執行B
2 當B執行完成後,我們需要將B的結果send()至A中止的地方,繼續執行A
Runner()主要就是來做這些的,也就是控制生成器的執行與中止,並在合適的情況下使用send()方法同時傳入B生成器的結果喚醒A生成器。
來看一個簡單例子:
View Code上例中的Runner()僅僅完成了第一步,我們還需要手動的執行第二步,而tornado的gen的Runner()則做了全套奧!
2 剖析Runner()
在Runner()中主要有三個方法__init__ handle_yield run:
Runner()2.1 __init__方法
__init__ 裏面執行了一些初始化的操作,最主要是最後兩句:
if self.handle_yield(first_yielded): # 運行 self.run()
2.2 handle_yield方法
handle_yield(self, yielded) 函數,這個函數顧名思義,就是用來處理yield返回的對象的。
首先我們假設yielded是一個Future對象(因為這是最常用的情況),這樣的話代碼就縮減了很多
def handle_yield(self, yielded): self.future = convert_yielded(yielded) # 如果yielded是Future對象則原樣返回 if not self.future.done() or self.future is moment: # moment是tornado初始化時就建立的一個Future對象,且被set_result(None) self.io_loop.add_future(self.future, lambda f: self.run()) # 為該future添加callback return False return True
也就是幹了三步:
首先解析出self.future
然後判斷self.future對象是否已經被done(完成),如果沒有的話為其添加回調函數,這個回調函數會執行self.run()
返回self.future對象是否被done
總體來說,handle_yield返回yielded對象是否被set_done,如果沒有則為yielded對象添加回調函數,這個回調函數執行self.run()
還有一個有趣的地方,就是上面代碼的第四行: self.io_loop.add_future(self.future, lambda f: self.run())
def add_future(self, future, callback): # 為future添加一個回調函數,這個回調函數的作用是:將參數callback添加至self._callbacks中 # 大家思考一個問題: 如果某個Future對象被set_done,那麽他的回調函數應該在什麽時候執行? # 是立即執行亦或者是將回調函數添加到IOLoop實例的_callbacks中進行統一執行? # 雖然前者更簡單,但導致回調函數的執行過於混亂,我們應該讓所有滿足執行條件的回調函數統一執行。顯然後者更合理 # 而add_future()的作用就是這樣 future.add_done_callback(lambda future: self.add_callback(callback, future)) def add_callback(self, callback, *args, **kwargs): # 將callback添加至_callbacks列表中 self._callbacks.append(functools.partial(callback, *args, **kwargs))
2.3 run方法
再來看self.run()方法。這個方法實際上就是一個循環,不停的執行generator的send()方法,發送的值就是yielded的result。
我們可以將run()方法簡化一下:
def run(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. 循環向generator中傳遞值,直到某個yield返回的yielded還沒有被done """ try: self.running = True while True: future = self.future if not future.done(): return self.future = None # 清空self.future value = future.result() # 獲取future對象的結果 try: yielded = self.gen.send(value) # send該結果,並將self.gen返回的值賦值給yielded(一般情況下這也是個future對象) except (StopIteration, Return) as e: self.finished = True self.future = _null_future self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return if not self.handle_yield(yielded): # 運行self.handler_yield(yielded),如果yielded對象沒有被done,則直接返回;否則繼續循環 return finally: self.running = False
總結:
1 每一個Future對應一個異步操作
2 該Future對象可以添加回調函數,當該異步操作完成後,需要對該Future對象設置set_done或者set_result,然後執行其所有的回調函數
3 凡是使用了coroutine裝飾器的generator函數都會返回一個Future對象,同時會不斷為該generator,該generator每一次運行send()或者next()的返回結果yielded以及future對象運行Runner()
4 Runner()會對generator不斷進行send()或者next()操作。具體步驟是:上一個next()或者send()操作返回的yielded(一般是一個Future對象)被set_done後,將該yielded對象的結果send()至generator中,不斷循環該操作,直到產生StopIteration或者Return異常(這表示該generator執行結束),這時會為該generator對應的Future對象set_result。
我們可以看到tornado的協程是基於generator的,generator可以通過yield關鍵字暫停執行,也可以通過next()或者send()恢復執行,同時send()可以向generator中傳遞值。
而將協程連接起來的紐帶則是Future對象,每一個Future對象都對應著一個異步操作,我們可以為該對象添加許多回調函數,當異步操作完成後通過對Future對象進行set_done或者set_result就可以執行相關的回調函數。
提供動力的則是Runner(),他不停的將generator所yield的每一個future對象的結果send()至generator,當generator運行結束,他會進行最後的包裝工作,對該generator所對應的Future對象執行set_result操作。
參考:
http://blog.csdn.net/wyx819/article/details/45420017
http://www.cnblogs.com/apexchu/p/4226784.html
深入tornado中的協程