[Python] 使用multiprocessing實作watchdog « Terrence的宅宅幻想
今天有一個需求,當一個function執行太長的時候我希望可以把它停止
比如說我今天使用facebokk presto做資料倉儲的時候
當資料量過大導致執行時間太長,更甚的是OutOfMemoryError導致presto的worker node shutdown
因此我希望有個watchdog的功能當我functon call執行太長的時候自動timeout,類似如下的感覺
最開始的想法是希望使用python的threading去實作
但是似乎只要thread的function沒有執行完成,thread就不會停止.
這並非我想要的效果,我希望當timeout發生的時候立即停止function的執行,也因此後來選了multiprocessing
multiprocessing直接支援terminate的呼叫,因此我只要搭配定期檢查timeout就能完成一個簡單的watchdog
程市碼如下
測試
姑且是達到了我所想要的效果,但是這段程式有個嚴重的問題,就是他無法捕捉exception而只能處理timeout exception
這樣一來function call失敗外部是無法處理的
可以用multiprocess的Pool去改寫解決這個問題
使用apply_async函式得到一個future物件,當使用該物件的get function時候,除了結果之外還能正確捕捉到exception
但這邊還有一個小問題,就是會跑出PicklingError
這時候可以用copy_reg來解決這問題,新增一段如下的程市碼
這時候噴出另一個錯誤: typeerror
會出現這問題是因為我自制的TimeoutError 建構子需要兩個引數self及value
而multiprocessing.pool.AsyncResult.get()沒辦法對超過一個以上引數exception進行初始化
因此這邊必須在做個小修正
如此一來才真正完成一個可用的watchdog,完整程式碼如下: