1. 程式人生 > 其它 >tornado中的協程是如何工作的

tornado中的協程是如何工作的

協程定義

    Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.。 ―― [ 維基百科 ]

我們在平常程式設計中,更習慣使用的是子例程(subroutine),通俗的叫法是函式,或者過程。子例程,往往只有一個入口(函式呼叫,實參通過傳給形參開始執行),一個出口(函式return,執行完畢,或者引發異常,將控制權轉移給呼叫者)。但協程是子例程基礎上,一種更加寬泛定義的計算機程式模組(子例程可以看做協程的特例),它可以有多個入口點,允許從一個入口點,執行到下一個入口點之前暫停,儲存執行狀態,等到合適的時機恢復執行狀態,從下一個入口點重新開始執行,這也是協程應該具有的能力。

定義

**協程程式碼塊
**

一個入口點和下一個入口點(或者退出點)中的程式碼。

**協程模組
**

由n個入口點程式碼,和n個協程程式碼塊組成。第一個入口點通常是一個函
數入口點。其組織形式如:函式入口點->協程程式碼塊->入口點->協程程式碼塊…,入口點和程式碼塊相間。

**線性模組
**

一個同步函式的函式體是線性執行的。也就是說一個模組中的每一行程式碼,相繼執行,一個模組在執行中,如果還沒有執行完畢,不會去執行其他模組的程式碼。稱這樣的程式碼模組為線性模組。

一個協程模組,如果只含有單一入口點和單一協程程式碼塊(假設這個協程程式碼塊全是同步程式碼),當然這個協程模組是一個線性執行模組,但是如果含有多個入口點和多個協程程式碼塊,那麼就不是一個線性模組。那麼執行一個協程模組過程實際是分散的(不同的時間段,執行不同的協程程式碼塊,協程程式碼塊的執行時間段,彼此不相交),但也是順序的(後一個協程程式碼塊在前一個協程程式碼塊執行結束後才執行)。兩個屬於同一協程模組的相繼協程程式碼塊執行的中間時間間隙,可能有很多其他協程模組的協程程式碼片段在執行。

生成器和yield語義

談到協程,必須要說說python語義中的生成器(generator)。

在pep255中提到了”simple generator”和”yield語句”(此時還不是”yield表示式”)的實現。一個basic
idea,提供一種函式,能夠返回中間結果給呼叫者,然後維護函式的區域性狀態,以便函式當離開後,也能恢復執行。

prep255中舉了一個簡單的例子,生成斐波那契數列:

     def fib():
       a, b = 0, 1
       while 1:
        yield b
        a, b = b, a+b

a,b初始化為0,1。當yield
b被執行,1被返回給呼叫者。當fib恢復執行,a,變成了1,b也是1,然後將1返回給呼叫者,如此迴圈。generator是一種非常自然的程式設計方式,因為對於fib來說,它的功能不變,都還是不斷生成下一個斐波那契數。而對於fib的呼叫者來說,fib像一個列表的迭代器,不斷迭代,可以獲取下一個斐波那契數。

    def caller():
      for num in fib():
        print num

生成器是一個含有yield表示式的函式,此時該函式叫生成器。一個生成器永遠是非同步的,即使生成器模組中含有阻塞程式碼。因為呼叫一個生成器,生成器的引數會繫結到生成器,結果返回是一個生成器物件,它的型別是types.GeneratorType,不會去執行生成器主模組中的程式碼。

每次呼叫一個GeneratorType物件的next方法,生成器函式執行到下一個yield語句或者,或者碰到一個return語句,或者執行到生成器函式結束。

在pep342中,對Generator進一步加強,增加了GeneratorType的send方法,和yield表示式語義。yield表示式,可以作為等號右邊的表示式。如果對Generator呼叫send(None)方法,生成器函式會從開始一直執行到yield表示式。那麼下一次對Generator呼叫send(argument),Generator恢復執行。那麼可以在生成器函式體內獲得這個argument,這個argument將會作為yield表示式的返回值。

從上面可以看到,Generator已經具備協程的一些能力。如:能夠暫停執行,儲存狀態;能夠恢復執行;能夠非同步執行。

但是此時Generator還不是一個協程。一個真正的協程能夠控制程式碼什麼時候繼續執行。而一個Generator執行遇到一個yield表示式
或者語句,會將執行控制權轉移給呼叫者。

    However, it is still possible to implement coroutines on top of a generator facility, with the aid of a top-level dispatcher routine (a trampoline, essentially) that passes control explicitly to child generators identified by tokens passed back from the generators。 ―― [ 維基百科 ]

在維基百科中提到,可以實現一個頂級的排程子例程,將執行控制權轉移回Generator,從而讓它繼續執行。在tornado中,ioLoop就是這樣的頂級排程子例程,每個協程模組通過,函式裝飾器coroutine和ioLoop進行通訊,從而ioLoop可以在協程模組執行暫停後,在合適的時機重新排程協程模組執行。

不過,接下來還不能介紹coroutine和ioLoop,在介紹這兩者之前,先得明白tornado中在協程環境中一個非常重要的類Future.

Future類

Future類位於tornado原始碼的concurrent模組中。Future類的完整程式碼,請檢視tornado的原始碼。在這裡擷取一部分程式碼作為分析之用

    class Future(object):
      def done(self):
        return self._done
    
      def result(self, timeout=None):
        self._clear_tb_log()
        if self._result is not None:
          return self._result
        if self._exc_info is not None:
          raise_exc_info(self._exc_info)
        self._check_done()
        return self._result
    
      def add_done_callback(self, fn):
        if self._done:
          fn(self)
        else:
          self._callbacks.append(fn)
    
      def set_result(self, result):
        self._result = result
        self._set_done()
    
      def _set_done(self):
        self._done = True
        for cb in self._callbacks:
          try:
            cb(self)
          except Exception:
            app_log.exception('exception calling callback %r for %r',
                     cb, self)
        self._callbacks = None

Future類重要成員函式:

def done(self):

Future的_result成員是否被設定

def result(self, timeout=None):

獲取Future物件的結果

def add_done_callback(self, fn):

新增一個回撥函式fn給Future物件。如果這個Future物件已經done,則直接執行fn,否則將fn加入到Future類的一個成員列表中儲存。

def _set_done(self):

一個內部函式,主要是遍歷列表,逐個呼叫列表中的callback函式,也就是前面add_done_calback加如來的。

def set_result(self, result):

給Future物件設定result,並且呼叫_set_done。也就是說,當Future物件獲得result後,所有add_done_callback加入的回撥函式就會執行。

Future封裝了非同步操作的結果。實際是它類似於在網頁html前端中,圖片非同步載入的佔位符,但載入後最終也是一個完整的圖片。Future也是同樣用處,tornado使用它,最終希望它被set_result,並且呼叫一些回撥函式。Future物件實際是coroutine函式裝飾器和IOLoop的溝通使者,有著非常重要的作用。

IOLoop類

tornado框架的底層核心類,位於tornado的ioloop模組。功能方面類似win32視窗的訊息迴圈。每個視窗可以繫結一個視窗過程。視窗過程主要是一個訊息迴圈在執行。訊息迴圈主要任務是利用PeekMessage系統呼叫,從訊息佇列中取出各種型別的訊息,判斷訊息的型別,然後交給特定的訊息handler進行執行。

tornado中的IOLoop與此相比具有很大的相似性,在協程執行環境中擔任著協程排程器的角色,
和win32的訊息迴圈本質上都是一種事件迴圈,等待事件,然後執行對應的事件處理器(handler)。不過IOLoop主要排程處理的是IO事件(如讀,寫,錯誤)。除此之外,還能排程callback和timeout事件。

在本博文中,我們暫時只關注callback事件,因為這個與協程排程的相關性最大。

    def add_future(self, future, callback):
      assert is_future(future)
      callback = stack_context.wrap(callback)
      future.add_done_callback(
        lambda future: self.add_callback(callback, future))

add_future函式在基類IOLoop中實現,函式引數是一個Future物件和一個callback函式。當Future物件被set_result,執行一個回撥函式,是個lambda函式,在lambda函式中呼叫IOLoop的add_callback函式。將add_future的引數callback加入到IOLoop的統一排程中,讓callback在IOLoop下一次迭代中執行。

    def add_callback(self, callback, *args, **kwargs):
      with self._callback_lock:
        if self._closing:
          raise RuntimeError("IOLoop is closing")
        list_empty = not self._callbacks
        self._callbacks.append(functools.partial(
          stack_context.wrap(callback), *args, **kwargs))
        if list_empty and thread.get_ident() != self._thread_ident:
          self._waker.wake()

add_callback函式主要在IOLoop的子類PollIOLoop中實現。也很容易理解。

將傳入的callback函式,利用偏函式進行包裝,將所有callback真正執行時需要的引數,都繫結到生成的偏函式中,實際上就是找個地方把callback執行時需要的引數儲存起來。將包裝好的偏函式加入到回撥函式列表。當IOLoop下一次迭代執行的時候,遍歷callback函式列表,執行偏函式的時候,就不再需要傳入引數執行,效果等同於用實參執行callback。

IOLoop物件呼叫start函式,會執行event loop。在event
loop中,首先遍歷callback列表,執行回撥函式,然後遍歷timeout列表,執行timeoutCallback。最後才執行ioHandler。

coroutine函式裝飾器

函式裝飾器本質是一個函式,我們稱這個函式為裝飾器函式。裝飾器函式簽名含有一個
函式物件(可呼叫物件callable)引數,返回的結果是一個裝飾器內部定義的一個新函式物件。如果返回的函式物件被呼叫,裝飾器函式的引數(函式物件)也會被呼叫。不過,會在這個引數(裝飾器函式引數)呼叫前做一些事情,或者在這個引數呼叫後做一些事情。實際上做的這些事情,就是利用內部自定義的函式物件對引數(原函式)的一些裝飾(額外操作)

當一個函式被裝飾器裝飾。那麼以後呼叫這個函式(此函式已經非彼函式)的時候,實際上呼叫的是裝飾器函式返回的內部函式物件。理解tornado中coroutine修飾的函式如何執行,主要是
理解coroutine這個裝飾器函式內部定義的新函式物件所做的那些事兒。

    def coroutine(func, replace_callback=True):
      return _make_coroutine_wrapper(func, replace_callback=True)
    class Runner(object):
      def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.yield_point = None
        self.pending_callbacks = None
        self.results = None
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        self.stack_context_deactivate = None
        if self.handle_yield(first_yielded):
          self.run()
    
      def run(self):
        if self.running or self.finished:
          return
        try:
          self.running = True
          while True:
            future = self.future
            if not future.done():
              return
            self.future = None
            try:
              try:
                value = future.result()
              except Exception:
                self.had_exception = True
                yielded = self.gen.throw(*sys.exc_info())
              else:
                yielded = self.gen.send(value)
            except (StopIteration, Return) as e:
              self.finished = True
              self.future = _null_future
              self.result_future.set_result(getattr(e, 'value', None))
              self.result_future = None
              return
            except Exception:
              self.finished = True
              self.future = _null_future
              self.result_future.set_exc_info(sys.exc_info())
              self.result_future = None
              return
            if not self.handle_yield(yielded):
              return
        finally:
          self.running = False
    
      def handle_yield(self, yielded):
    
        try:
          self.future = convert_yielded(yielded)
        except BadYieldError:
          self.future = TracebackFuture()
          self.future.set_exc_info(sys.exc_info())
    
        if not self.future.done() or self.future is moment:
          self.io_loop.add_future(
            self.future, lambda f: self.run())
          return False
        return True

以上的程式碼其實都對原始碼進行了一些調整。但函式呼叫進入到Runner的建構函式的時候,也就是說Generator的第一次執行已經完畢。那麼接下來,呼叫的是,handle_yield,對第一次Generator執行的返回結果進行處理。當然返回的結果可能是多種型別。可能是一個Future物件,list,dict,或者其他型別物件,或者普通型別。通過convert_yield,self.future儲存的是一個Future物件的引用(第一次Generator執行返回的結果)。此時如果self.future還沒被set_result。對為self.future繫結一個done_callback(lambda
f: self.run()),加入到self.io_loop中。

在前文說到。ioloop的add_future函式中,實際上是隻有當引數future,在某個地方呼叫了set_result,
才在執行done_callback時,將引數callback加入到IOLoop中排程。換句話說。Runner類中,self.run要等到self.future在某個程式碼塊被set_result,IOLoop才有可能在下一次迭代的時候執行它,從而排程協程繼續恢復執行。而在self.run函式中,我們可以看到將會通過Generator的send函式,恢復執行下一個協程程式碼塊。所以關鍵的問題是我們需要明白Runner類中self.future,在什麼時候被set_result?

從這裡我們可以看到Future類的重要作用。future.set_result起到的作用是:

傳送一個訊號,告訴IOLoop去排程暫停的協程繼續執行。

我們結合下面的程式碼例子就可以明白協程排程的整個流程是如何進行的了。

    import tornado.ioloop
    from tornado.gen import coroutine
    from tornado.concurrent import Future
    
    @coroutine
    def asyn_sum(a, b):
      print("begin calculate:sum %d+%d"%(a,b))
      future = Future()
    
      def callback(a, b):
        print("calculating the sum of %d+%d:"%(a,b))
        future.set_result(a+b)
      tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)
    
      result = yield future
    
      print("after yielded")
      print("the %d+%d=%d"%(a, b, result))
    
    def main():
      asyn_sum(2,3)
      tornado.ioloop.IOLoop.instance().start()
    
    if __name__ == "__main__":
      main()

實際的執行場景是:一個協程(asyn_sum)遇到yield表示式被暫停執行後,IOLoop呼叫另外一個程式碼段(asyn_sum中的回撥函式callback)執行,而在callback中,剛好可以訪問到屬於被暫停協程(asyn_sum)中的future物件(也就是Runner物件中的self.future的引用),callback中將future呼叫set_result,那麼這個暫停的協程(asyn_sum)在IOLoop下一次迭代排程回撥函式時中,被恢復執行。

總結

tornado中的協程實現基於python語言的Generator並且結合一個全域性的排程器IOLoop,Generator通過函式裝飾器coroutine和IOLoop進行通訊。IOLoop並沒有直接控制能力,排程恢復被暫停的協程繼續執行。future物件在協程中被yield。協程暫停,IOLoop排程另外一個程式碼模組執行,而在這個執行的程式碼模組中剛好,可以訪問這個future物件,將其set_result,結果通過IOLoop間接恢復暫停協程執行。不同執行程式碼模組中,共享future物件,彼此合作,協程排程得順利執行。

從這種意義上來說,future物件,像window中的Event核心物件的作用。window中的event用於執行緒中同步。而協程中的yield
future相當於WaitForSingleObject(event_object),
而future.set_result(result)。相當於SetEvent(event_object)。而future和Event的不同點在於,協程借future來恢復執行,而執行緒借Event來進行執行緒間同步。

以上就是本文關於詳細解讀tornado協程(coroutine)原理的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支援!