1. 程式人生 > >Python 強制停止多線程運行

Python 強制停止多線程運行

main orm nap return bin forms inspect start center

強制停止多線程運行

by:授客 QQ:1033553122

#!/usr/bin/env python

# -*- coding:utf-8 -*-

__author__ = ‘shouke‘

import threading

import time

import inspect

import ctypes

def _async_raise(tid, exctype):

"""raises the exception, performs cleanup if needed"""

tid = ctypes.c_long(tid)

if not

inspect.isclass(exctype):

exctype = type(exctype)

res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))

if res == 0:

raise ValueError("invalid thread id")

elif res != 1:

# """if it returns a number greater than one, you‘re in trouble,

# and you should call it again with exc=NULL to revert the effect"""

ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)

raise SystemError("PyThreadState_SetAsyncExc failed")

def stop_thread(thread):

_async_raise(thread.ident, SystemExit)

def test():

try

:

while True:

print(‘-------‘)

time.sleep(0.5)

except Exception as e:

print(‘Exception:%s‘ % e)

finally:

print(‘stop running test function‘)

if __name__ == "__main__":

t = threading.Thread(target=test)

t.start()

time.sleep(5)

stop_thread(t)

print("main thread running")

print("main thread running")

print("main thread running")

print("main thread running")

運行結果:

技術分享圖片

結論:

按上述方法是可以停止多線程的,但是需要註意的地方是,線程退出前,會執行try...finally中的代碼,如果代碼包含了多層try...finally,每一層的finally中的語句都會被執行,如下:

修改代碼如下

def test():

try:

try:

while True:

print(‘-------‘)

time.sleep(0.5)

except Exception as e:

print(‘Exception:%s‘ % e)

finally:

print(‘stop running test function‘)

print(‘outer try‘)

except Exception:

pass

finally:

print(‘outer try finally‘)

再次運行,結果

技術分享圖片

Python 強制停止多線程運行