論 Python 裝飾器控制函式 Timeout 的正確姿勢
1、問題背景
之前在這篇《Python RPC 遠端呼叫指令碼之 RPyC 實踐》中實現過一個小 Demo,通過 RPyC 可以實現一個簡單的分散式程式,但是,有過開發經驗的同學應該一眼就能看出這個程式有個致命缺陷:假如使用者執行了一個非常耗時或者耗資源的程式,那客戶端將永遠無法獲取結果甚至導致服務端直接宕掉,因此我們需要對命令的執行時長做出限制,引入 Timeout 機制增強程式健壯性和使用者體驗。
2、so easy:裝飾器!
如果你恰好看過我之前的這篇《深入淺出 Python 裝飾器:16 步輕鬆搞定 Python 裝飾器》,那應該很自然的想到,Python 裝飾器最適合這種業務場景了:對函式進行額外功能性包裝,又不侵入主體業務邏輯。
Timeout 裝飾器的程式碼如下:
# coding=utf-8 # 測試utf-8編碼 import sys reload(sys) sys.setdefaultencoding('utf-8') import signal, functools class TimeoutError(Exception): pass def timeout(seconds, error_message="Timeout Error: the cmd 30s have not finished."): def decorated(func): result = "" def _handle_timeout(signum, frame): global result result = error_message raise TimeoutError(error_message) def wrapper(*args, **kwargs): global result signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(seconds) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return result return functools.wraps(func)(wrapper) return decorated @timeout(5) # 限定下面的slowfunc函式如果在5s內不返回就強制拋TimeoutError Exception結束 def slowfunc(sleep_time): a = 1 import time time.sleep(sleep_time) return a # slowfunc(3) #sleep 3秒,正常返回 沒有異常 print slowfunc(11) # 被終止
測試用例也正常,但是把這個裝飾器用在文初提到的 RPC 程式碼中時,拋了異常:
Traceback (most recent call last): File "exec_cmd.py", line 79, in <module> exec_cmd(cmd_str) File "exec_cmd.py", line 53, in exec_cmd results = pool.map(rpc_client, host_port_list) File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/pool.py", line 251, in map return self.map_async(func, iterable, chunksize).get() File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/pool.py", line 567, in get raise self._value ValueError: signal only works in main thread ========= Remote Traceback (1) ========= Traceback (most recent call last): File "/opt/soft/python-2.7.10/lib/python2.7/site-packages/rpyc/core/protocol.py", line 305, in _dispatch_request res = self._HANDLERS[handler](self, *args) File "/opt/soft/python-2.7.10/lib/python2.7/site-packages/rpyc/core/protocol.py", line 535, in _handle_call return self._local_objects[oid](*args, **dict(kwargs)) File "flumeFileMonitor_RPC_Server.py", line 39, in wrapper signal.signal(signal.SIGALRM, _handle_timeout) ValueError: signal only works in main thread
為了更簡單說明問題,我們把測試程式碼再簡化下:
# coding=utf-8
#測試utf-8編碼
from time import sleep, time
import sys, threading
reload(sys)
sys.setdefaultencoding('utf-8')
from multiprocessing.dummy import Pool as ThreadPool
@timeout(1)
def processNum(num):
num_add = num + 1
# results.append(str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add))
sleep(2)
return str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)
def main():
ts = time()
pool = ThreadPool(4)
results = pool.map(processNum, range(4))
pool.close()
pool.join()
for _ in results:
print _
print("cost time is: {:.2f}s".format(time() - ts))
if __name__ == "__main__":
main()
可以看到報錯是因為 signal 只能用在主執行緒中,不能用在多執行緒環境下的子執行緒中,而且 signal 只能用在 *nix 環境下,不能跨平臺,看到這裡,似乎這個問題又不那麼容易解決了,看來咱們得另闢蹊徑。
3、另闢蹊徑:執行緒控制超時
大體邏輯如下:咱們啟動新子執行緒執行指定的方法,主執行緒等待子執行緒的執行結果,若在指定時間內子執行緒還未執行完畢,則判斷為超時,丟擲超時異常,並殺掉子執行緒;否則未超時,返回子執行緒所執行的方法的返回值。但是python預設模組裡是沒有方法可以殺掉執行緒的,怎麼辦呢?發現有人已經實現了該KThread類,它繼承了threading.Thread,並添加了kill方法,讓我們能殺掉子執行緒。
先上程式碼,然後我會簡述下 KThread類的設計思路:
from time import sleep, time
import sys, threading
class KThread(threading.Thread):
"""A subclass of threading.Thread, with a kill()
method.
Come from:
Kill a thread in Python:
http://mail.python.org/pipermail/python-list/2004-May/260937.html
"""
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self.killed = False
def start(self):
"""Start the thread."""
self.__run_backup = self.run
self.run = self.__run # Force the Thread to install our trace.
threading.Thread.start(self)
def __run(self):
"""Hacked run function, which installs the
trace."""
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
def globaltrace(self, frame, why, arg):
if why == 'call':
return self.localtrace
else:
return None
def localtrace(self, frame, why, arg):
if self.killed:
if why == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
self.killed = True
class Timeout(Exception):
"""function run timeout"""
def timeout(seconds):
"""超時裝飾器,指定超時時間
若被裝飾的方法在指定的時間內未返回,則丟擲Timeout異常"""
def timeout_decorator(func):
"""真正的裝飾器"""
def _new_func(oldfunc, result, oldfunc_args, oldfunc_kwargs):
result.append(oldfunc(*oldfunc_args, **oldfunc_kwargs))
def _(*args, **kwargs):
result = []
new_kwargs = { # create new args for _new_func, because we want to get the func return val to result list
'oldfunc': func,
'result': result,
'oldfunc_args': args,
'oldfunc_kwargs': kwargs
}
thd = KThread(target=_new_func, args=(), kwargs=new_kwargs)
thd.start()
thd.join(seconds)
alive = thd.isAlive()
thd.kill() # kill the child thread
if alive:
# raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
try:
raise Timeout(u'function run too long, timeout %d seconds.' % seconds)
finally:
return u'function run too long, timeout %d seconds.' % seconds
else:
return result[0]
_.__name__ = func.__name__
_.__doc__ = func.__doc__
return _
return timeout_decorator
然後根據上面的程式碼測試結果如下:
@timeout(1)
def processNum(num):
num_add = num + 1
# results.append(str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add))
sleep(2)
return str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)
//
function run too long, timeout 1 seconds.
function run too long, timeout 1 seconds.
function run too long, timeout 1 seconds.
function run too long, timeout 1 seconds.
cost time is: 1.17s
看了程式碼咱們再來聊聊上述 KThread 的設計思路:
關鍵點在那個threading.settrace(self.globaltrace),它是用來設定跟蹤除錯threading。
看下threading.settrace文件。需要線上程呼叫run前設定好,threading.settrace只起一箇中轉作用,它會線上程執行前將self.globaltrace傳給sys.settrace。
threading.settrace(func) Set a trace function for all threads started from the threading module. The func will be passed to sys.settrace() for each thread, before its run() method is called. New in version 2.3.
再看下sys.settrace的文件,英文文件說明有點長,參照上面程式碼看起來應沒什麼問題。
分析下上面的程式碼:
def start(self):
threading.settrace(self.globaltrace) #執行緒執行前設定跟蹤過程self.globaltrace
threading.Thread.start(self)#執行執行緒
def globaltrace(self,frame,why,arg):
if why=='call': #將會呼叫一個子過程
return self.localtrace #返回呼叫子過程的跟蹤過程self.localtrace,並使用子過程跟蹤過程self.localtrace跟蹤子過程執行
else:
return None
def localtrace(self,frame,why,arg):
if self._willKill and why=='line': #self._willKill自己設定的中斷標識,why為跟蹤的事件,其中line為執行一行或多行python程式碼
raise SystemExit() #當中斷標識為True及將會執行下一行python程式碼時,使用SystemExit()中斷執行緒
return self.localtrace
這就是中斷執行緒的整個過程。只是在執行緒每執行一行程式碼將都檢查一下中斷標識,如果需要中斷則返回,否則繼續執行。
4、缺陷
- 整體的執行效率會慢一點。因為每次執行一句python語句,都會有一個判斷的過程。
- 因為其本質是使用將函式使用過載的執行緒來控制,一旦被新增裝飾器的函式內部使用了執行緒或者子程序等複雜的結構,而這些執行緒和子程序其實是無法獲得超時控制的,所以可能導致外層的超時控制無效。
5、函式超時在多執行緒場景下 2 個常見誤區
- sleep、wait、join 不能直接用來實現或替代超時功能
尤其是 join(timeout) 方法裡的 timeout 很容易讓初學者誤解,以為呼叫了 join(n) 就是 n 秒後執行緒超時結束
咱們先看下文件:
When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call isAlive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
可以看到其實 timeout 只是將主執行緒阻塞,它只告訴join等待子執行緒執行多久,如果超時後,主執行緒和子執行緒還是各自向下繼續執行,因此你必須呼叫 isAlive() 來決定是否超時發生——如果子執行緒還活著, 表示本次 join() 呼叫超時了。
- 舉個例子吧:
假設有 10 個執行緒,每個執行緒業務邏輯是 sleep 3s,現在需要總體控制在 2s 內執行完畢,很多初學者可能寫出這樣的程式碼:
for i in range(10):
t = ThreadTest(i)
thread_arr.append(t)
for i in range(10):
thread_arr[i].start()
for i in range(10):
thread_arr[i].join(2)
其實最後你會發現,這段程式碼會耗時 20s,因為每個 join(2) 都是順序執行的,而且沒有真正的超時結束功能。
還是上一份完整的程式碼供大家測試學習使用吧:
# coding=utf-8
# 測試utf-8編碼
from time import sleep, time
import sys, threading
from Queue import Queue
from threading import Thread
reload(sys)
sys.setdefaultencoding('utf-8')
def processNum(num):
num_add = num + 1
sleep(3)
print str(threading.current_thread()) + ": " + str(num) + " → " + str(num_add)
class ProcessWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
num = self.queue.get()
processNum(num)
self.queue.task_done()
thread_arr = []
def main():
ts = time()
queue = Queue()
for x in range(10):
worker = ProcessWorker(queue)
worker.daemon = True
worker.start()
thread_arr.append(worker)
for num in range(10):
queue.put(num)
# queue.join()
for _ in thread_arr:
_.join(2)
print("cost time is: {:.2f}s".format(time() - ts))
if __name__ == "__main__":
main()
好了,今天就先聊到這兒吧,多執行緒是個永恆的話題,路漫漫其修遠兮~
Refer:
[1] 深入淺出 Python 裝飾器:16 步輕鬆搞定 Python 裝飾器
http://my.oschina.net/leejun2005/blog/477614?fromerr=rNBm9BiN#OSC_h2_23
[2] Python RPC 遠端呼叫指令碼之 RPyC 實踐
http://my.oschina.net/leejun2005/blog/471624
[3] Python tips: 超時裝飾器, @timeout decorator
http://www.cnblogs.com/fengmk2/archive/2008/08/30/python_tips_timeout_decorator.html
[4] 可中止的執行緒
https://sites.google.com/site/mypynotes/skill/cankillthread
[5] Python模組學習:threading 多執行緒控制和處理
http://python.jobbole.com/81546/
[6] 一文學會Python多執行緒程式設計
[7] 一文學會Python多程序程式設計