1. 程式人生 > 其它 >gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

最近一個 python 專案中同時用到了 gevent 和 multiprocessing。在優雅退出的實現上,出現了一些預料之外的問題。

一個簡化版的程式碼,啟動了4 個程序,每個程序裡啟動了兩個協程,並註冊了 SIGINT 等訊號的回撥函式來實現優雅退出:

import signal
import time
import multiprocessing
import gevent
from gevent import monkey
monkey.patch_all()  # NOQA


class WorkerManager():
    def __init__(self):
        self.is_running = multiprocessing.Value('b', True)

    def job(self):
        while self.is_running.value:
            print("job")
            time.sleep(3)

    def run(self):
        for sig in [signal.SIGINT, signal.SIGUSR1, signal.SIGTERM]:
            signal.signal(sig, signal.SIG_IGN)

        jobs = [gevent.spawn(self.job) for _ in range(2)]
        gevent.joinall(jobs)

    def start(self):
        self.workers = [multiprocessing.Process(
            target=self.run) for _ in range(4)]
        for worker in self.workers:
            worker.start()
        signal.signal(signal.SIGINT, self.graceful_exit)

    def graceful_exit(self, sig, frame):
        self.shutdown()
 
    def shutdown(self):
        if not self.is_running.value:
            return
        self.is_running.value = False
        for worker in self.workers:
            worker.join()
        

worker_manager = WorkerManager()
worker_manager.start()

上面的程式碼執行後,按ctrl+c會報下面的錯誤:

gevent.hub.BlockingSwitchOutError: Impossible to call blocking function in the event loop callback

相關的呼叫棧

  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 28, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File ".../venv/lib/python3.7/site-packages/gevent/os.py", line 380, in waitpid
    get_hub().wait(new_watcher)
  File "src/gevent/_hub_primitives.py", line 46, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_hub_primitives.py", line 55, in gevent._gevent_c_hub_primitives.WaitOperationsGreenlet.wait
  File "src/gevent/_waiter.py", line 154, in gevent._gevent_c_waiter.Waiter.get
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 64, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
  File "src/gevent/_greenlet_primitives.py", line 67, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out
  File "src/gevent/_greenlet_primitives.py", line 68, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch_out

背景知識

  • 訊號處理的原理是作業系統會把訊號發給程式的每個程序,每個程序原來的邏輯就中斷了,然後呼叫我們註冊的訊號回撥函式來處理。

  • gevent 是一個流行的 python 網路庫,主要的功能就是在 python 中提供了一些事件迴圈的介面。它是基於 greenlet 實現的。greenlet 也可以理解為協程,就像 golang 裡的 goroutine。

  • greenlet 的功能就是提供了在不同調用棧之間切換(switch)的能力。比如一會執行這個協程,然後它要阻塞等待一些 IO 操作,那就主動切換到另一個協程的呼叫棧去執行另一個協程。而 gevent 就對 greenlet 進行了一層封裝,我們只用呼叫 gevent.spawn() 就可以建立並執行協程,gevent 會幫我們排程。gevent 還封裝了一些作業系統自帶的函式,比如 sleep。

  • 每個 greenlet 都會在一個執行緒上,一個執行緒上可以有多個 greenlet,但一次只有一個 greenlet 在執行。

  • 對於每個協程,都需要在一個 hub 裡執行,hub 被翻譯為集線器,hub 也是一個 greenlet,為什麼又要搞個 greenlet 呢,因為它是幫我們做切換呼叫棧的傢伙。

  • hub 裡執行著事件迴圈(loop),什麼是事件迴圈呢?就是說作業系統會發出事件通知你的程式,比如一個 socket 可以讀了,你的程式就可以做相應處理。這種註冊事件、等待著並在事件發生時做處理的流程就是事件迴圈。

  • 當我們呼叫 spawn 時,會建立一個新的 greenlet,並在 hub 裡註冊事件,事件迴圈收到事件通知時,就會呼叫我們的回撥函式。而如果回撥函式裡有一個 sleep 之類的阻塞事件,gevent 的實現中就會進行 switch 操作,也就是切到 hub,等阻塞操作完成,就又會從 hub 裡切換回來。

  • 呼叫 join 或 joinall 時,就會切換到 hub 裡,會啟動事件輪詢來等待協程結束。

原因

回到我們的程式碼裡,我們用了gevent 的 monkey.patch_all(),並且用到了 multiprocessing,而出錯的呼叫棧中可以看到問題出在對子程序 join 時,這個 join 函式在 multiprocess 庫裡,呼叫了 os.waitpid,這裡就會呼叫 gevent 實現的 os,由於是個阻塞操作,就會在 switch_out 時出錯。為什麼出錯呢?這是 gevent 裡相關的程式碼:

class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
    # Subclasses must define:
    # - self.loop

    # This class defines loop in its .pxd for Cython. This lets us avoid
    # circular dependencies with the hub.

    def switch(self):
        switch_out = getattr(getcurrent(), 'switch_out', None) # pylint:disable=undefined-variable
        if switch_out is not None:
            switch_out()
        return _greenlet_switch(self) # pylint:disable=undefined-variable

    def switch_out(self):
        raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')

因為我們的程式收到訊號中斷時,主程序裡沒有其他的 greenlet,所以執行著的是 hub 本身這個 greenlet,所以 switch_out 時會找之前在跑的 greenlet,結果就是 hub 本身。

一般 switch_out 是用來從一個普通的 greenlet 切換到 hub 裡的,現在從 hub 裡無法再切換到其它地方了。所以就是‘BlockingSwitchOutError’ 錯誤了。

參考:

解決方法

  1. 既然是 hub 裡無法切出去,那我們可以把 shutdown 放到一個 greenlet 裡:
    def graceful_exit(self, sig, frame):
        gevent.spawn(self.shutdown)

ps:一不小心寫成了 self.shutdown(),後面加上了(),就和沒改一樣,所以報了一樣的錯了。

  1. 也可以不讓 gevent 影響 multiprocess 裡的 os 函式


┆涼┆暖┆降┆等┆幸┆我┆我┆裡┆將┆ ┆可┆有┆謙┆戮┆那┆ ┆大┆始┆ ┆然┆
┆薄┆一┆臨┆你┆的┆還┆沒┆ ┆來┆ ┆是┆來┆遜┆沒┆些┆ ┆雁┆終┆ ┆而┆
┆ ┆暖┆ ┆如┆地┆站┆有┆ ┆也┆ ┆我┆ ┆的┆有┆精┆ ┆也┆沒┆ ┆你┆
┆ ┆這┆ ┆試┆方┆在┆逃┆ ┆會┆ ┆在┆ ┆清┆來┆準┆ ┆沒┆有┆ ┆沒┆
┆ ┆生┆ ┆探┆ ┆最┆避┆ ┆在┆ ┆這┆ ┆晨┆ ┆的┆ ┆有┆來┆ ┆有┆
┆ ┆之┆ ┆般┆ ┆不┆ ┆ ┆這┆ ┆裡┆ ┆沒┆ ┆殺┆ ┆來┆ ┆ ┆來┆