1. 程式人生 > >深度分析gevent執行流程

深度分析gevent執行流程

一直對gevent執行流程比較模糊,最近看原始碼略有所得,不敢獨享,故分享之。

gevent是一個高效能網路庫,底層是libevent,1.0版本之後是libev,核心是greenlet。gevent和eventlet是親近,唯一不同的是eventlet是自己實現的事件驅動,而gevent是使用libev。兩者都有廣泛的應用,如OpenStack底層網路通訊使用eventlet,goagent是使用gevent。


要想理解gevent首先要理解gevent的排程流程,gevent中有一個hub的概念,也就是下圖的MainThread,用於排程所有其它的greenlet例項(下圖Coroutine)。
其實hub也是一個greenlet,只不過特殊一些。 看下圖我們會發現每次從hub切換到一個greenlet後,都會回到hub,這就是gevent的關鍵。 注意:gevent中並沒有greenlet鏈的說法,所有都是向主迴圈註冊greenlet.switch方法,主迴圈在合適的時機切換回來。

也許大家會好奇,為什麼採用這種模式,為什麼每次都要切換到hub?我想理由有二:

1.hub是事件驅動的核心,每次切換到hub後將繼續迴圈事件。如果在一個greenlet中不出來,那麼其它greenlet將得不到呼叫。

2.維持兩者關係肯定比維持多個關係簡單。每次我們所關心的就是hub以及當前greenlet,不需要考慮各個greenlet之間關係。


我們看看最簡單的gevent.sleep發生了什麼?

我們先想想最簡單的sleep(0)該如何排程?根據上面很明顯

1.向事件迴圈註冊當前greenlet的switch函式

2.切換到hub,執行主事件迴圈

[python]  view plain  copy
  1. def sleep(seconds=0, ref=True):  
  2.     hub = get_hub()  
  3.     loop = hub.loop  
  4.     if seconds <= 0:  
  5.         waiter = Waiter()  
  6.         loop.run_callback(waiter.switch)  
  7.         waiter.get()  
  8.     else:  
  9.         hub.wait(loop.timer(seconds, ref=ref))  
當seconds小於等於0時,loop.run_callback(waiter.switch)即是將當前greenlet的switch註冊到loop,使用waiter.get()切換到hub。那麼很明顯,

當切換到hub後當呼叫剛註冊的回撥(waiter.switch)回到剛剛sleep所在的greenlet。

不熟悉Waiter的童鞋可能對上面說的有點模糊,下面我們好好看看Waiter是什麼。

[python]  view plain  copy
  1. >>> result = Waiter()  
  2. >>> timer = get_hub().loop.timer(0.1)  
  3. >>> timer.start(result.switch, 'hello from Waiter')  
  4. >>> result.get() # blocks for 0.1 seconds  
  5.     'hello from Waiter'  
timer.start(result.switch, 'hello from Waiter')我們向hub的主迴圈註冊一個0.1s的定時器,回撥為result.switch,然後將執行result.get(),此時過程程式碼如下:

[python]  view plain  copy
  1. def get(self):  
  2.             assert self.greenlet is None'This Waiter is already used by %r' % (self.greenlet, )  
  3.             self.greenlet = getcurrent()  
  4.             try:  
  5.                 return self.hub.switch()  
  6.             finally:  
  7.                 self.greenlet = None  

將把self.greenlet設定為當前greenlet,然後通過self.hub.switch()切換到主迴圈,很明顯在主迴圈中將回調result.switch,看程式碼:

[python]  view plain  copy
  1. def switch(self, value=None):  
  2.             """Switch to the greenlet if one's available. Otherwise store the value."""  
  3.             greenlet = self.greenlet  
  4.             assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"  
  5.             switch = greenlet.switch  
  6.             try:  
  7.                 switch(value)  
  8.             except:  
  9.                 self.hub.handle_error(switch, *sys.exc_info())  
拿到剛儲存的greenlet,然後切換到greenlet.switch(),返回到我們剛呼叫reuslt.get()方法。通過上面assert我們也可以看出這是在hub中呼叫的。

通過以上分析,小夥伴們肯定都懂了gevent的執行流程了。

這裡有個問題,如果上面先發生result.switch,那又該如何呢?就像下面這樣:

[python]  view plain  copy
  1. >>> result = Waiter()  
  2. >>> timer = get_hub().loop.timer(0.1)  
  3. >>> timer.start(result.switch, 'hi from Waiter')  
  4. >>> sleep(0.2)  
  5. >>> result.get() # returns immediatelly without blocking  
  6.     'hi from Waiter'  
我想聰明的你,開啟hub.py再看看原始碼肯定就明白了(上面Waiter程式碼是我特意簡化的)。

既然我們知道了gevent執行流程,下面我們看看gevent.spawn和join到底做了什麼?

gevent.spawn其實就是Greenlet.spawn,所以gevent.spawn就是建立一個greenlet,並將該greenlet的switch()加入hub主迴圈回撥。

[python]  view plain  copy
  1. class Greenlet(greenlet):  
  2.     """A light-weight cooperatively-scheduled execution unit."""  
  3.   
  4.     def __init__(self, run=None, *args, **kwargs):  
  5.         hub = get_hub()  
  6.         greenlet.__init__(self, parent=hub)  
  7.         if run is not None:  
  8.             self._run = run  
  9.         self._start_event = None  
  10.   
  11.     def start(self):  
  12.         """Schedule the greenlet to run in this loop iteration"""  
  13.         if self._start_event is None:  
  14.             self._start_event = self.parent.loop.run_callback(self.switch)  
  15.  
  16.     @classmethod  
  17.     def spawn(cls, *args, **kwargs):  
  18.         """Return a new :class:`Greenlet` object, scheduled to start. 
  19.  
  20.         The arguments are passed to :meth:`Greenlet.__init__`. 
  21.         """  
  22.         g = cls(*args, **kwargs)  
  23.         g.start()  
  24.         return g  

通過下面程式碼證明:

[python]  view plain  copy
  1. import gevent  
  2.   
  3. def talk(msg):  
  4.     print(msg)  
  5.   
  6. g1 = gevent.spawn(talk, 'bar')  
  7. gevent.sleep(0)  

將輸出:bar,我們通過sleep切換到hub,然後hub將執行我們新增的回撥talk,一切正常。

此時不要沾沾自喜,如果下面程式碼也覺得一切正常再高興也不遲。

[python]  view plain  copy
  1. import gevent  
  2.   
  3. def talk(msg):  
  4.     print(msg)  
  5.     gevent.sleep(0)  
  6.     print msg  
  7.   
  8. g1 = gevent.spawn(talk, 'bar')  
  9. gevent.sleep(0)  

這次還是輸出:bar,有點不對勁啊,應該輸出兩個bar才對,為什麼為導致這樣呢?

我們來好好分析流程:

1.gevent.spawn註冊回撥talk

2.然後最後一行gevent.sleep(0)註冊當前greenlet.switch(最外面的)到hub,然後切換到hub

3.hub執行回撥talk,列印"bar",此時gevent.sleep再次將g1.switch註冊到hub,同時切換到hub

4.由於第2步最外層greenlet現註冊,所以將呼叫最外層greenlet,此時很明顯,程式將結束。因為最外層greenlet並不是hub的子greenlet,

所以died後並不會回到父greenlet,即hub


你可能會說那我自己手動切換到hub不就可以了嗎?這將導致主迴圈結束不了的問題。

[python]  view plain  copy
  1. import gevent  
  2.   
  3. def talk(msg):  
  4.     print(msg)  
  5.     gevent.sleep(0)  
  6.     print msg  
  7.   
  8. g1 = gevent.spawn(talk, 'bar')  
  9. gevent.get_hub().switch()  
程式輸出:

[python]  view plain  copy
  1. bar  
  2. bar  
  3. Traceback (most recent call last):  
  4.   File "F:\py_cgi\geve.py", line 9in <module>  
  5.     gevent.get_hub().switch()  
  6.   File "C:\Python26\lib\site-packages\gevent\hub.py", line 331in switch  
  7.     return greenlet.switch(self)  
  8. gevent.hub.LoopExit: This operation would block forever  
雖然成功的輸出了兩次“bar",但也導致了更為嚴重的問題。

這也就是join存在的價值,我們看看join是如何做到的?

[python]  view plain  copy
  1. def join(self, timeout=None):  
  2.     """Wait until the greenlet finishes or *timeout* expires. 
  3.     Return ``None`` regardless. 
  4.     """  
  5.     if self.ready():  
  6.         return  
  7.     else:  
  8.         switch = getcurrent().switch  
  9.         self.rawlink(switch)  
  10.         try:  
  11.             t = Timeout.start_new(timeout)  
  12.             try:  
  13.                 result = self.parent.switch()  
  14.                 assert result is self'Invalid switch into Greenlet.join(): %r' % (result, )  
  15.             finally:  
  16.                 t.cancel()