利用Thread.stop方法完成方法執行超時中斷
阿新 • • 發佈:2018-12-11
一、業務場景:
系統中存在多種場景併發操作事務執行時互鎖的情況,導致任務積壓,系統崩潰。先做了各場景業務的效能調整,但是併發互鎖依然無法避免。於是開始考慮選取呼叫頻繁的同步功能作為死鎖的犧牲品,取消執行,釋放鎖。
二、處理方案:
由於FutureTask.cancel方案無法解決資料庫死鎖問題。因此接下來切換方案,改為Thread.stop實現,強行殺掉子執行緒。核心思想是 把需要監控的操作 包裝成一個子執行緒,啟動以後主執行緒開啟一個執行時間監控,當判斷到子執行緒已經超出時限,則呼叫stop方法,強行中斷(需要說明一點,stop操作可能會帶來一些未知的異常狀態,jdk已經將其標記為過時,不建議使用,這裡需要個人自行評估方案的影響)。
那麼只做一個stop子執行緒的操作,能同時解決掉其對資料庫的鎖定嗎?經過驗證這是不可能的,對資料庫的鎖定需要在事務提交或回滾時 釋放。事務是由spring框架自行接管處理的,在發生死鎖,子執行緒未結束的情況下,事務即使到了超時時限,但是由於沒有新的sql執行語句觸發其超時校驗,spring事務也無法自行結束。它的結束點為 資料庫事務鎖等待超時。因此,現在我們想要讓事務提前回滾還需要一個操作,設定 @Transactional的timeout時間,指定為一個較短的時間,子執行緒被stop後,spring會自動對該事務進行回滾,從而達到資料庫解鎖的目的。
三、程式碼實現:
3.1、建立一個FTaskEndFlag的執行緒同步標誌。父執行緒等待子執行緒反饋執行結果後,再執行後續的邏輯;
package simm.framework.threadutils.interrupt; import java.util.concurrent.TimeoutException; /** * futuretask執行終止事件通知 * 2018.09.22 by simm */ public class FTaskEndFlag { private volatile boolean isNormaled = false; private volatile booleanfired = false; private Exception exception =null; public boolean isNormaled() { return isNormaled; } /** * 獲取子執行緒異常資訊 * @return */ public Exception getException() { return exception; } /** * 通知結束 * @param result * @param result*/ public synchronized void notifyEnd(boolean result){ isNormaled = result; fired = true; notifyAll(); } /** * 通知結束 * @param result * @param result */ public synchronized void notifyEnd(boolean result,Exception ex){ isNormaled = result; exception = ex; fired = true; notifyAll(); } /** * 執行結束通知 */ public synchronized void waitForEnd() throws InterruptedException { while (!fired) { //子執行緒掛起,釋放synchronized同步塊 wait(); } } /** * 執行結束通知 */ public void waitForEnd(Thread thread,Long timeout) throws TimeoutException { long begin = System.currentTimeMillis(); while(System.currentTimeMillis()-begin <= timeout){ waitFunc(10); //子執行緒已經執行完畢,則返回 if(fired) return; } //超時未返回,則終止執行緒 try{ thread.stop(); }catch(Exception e){ e.printStackTrace(); throw e; } throw new TimeoutException("方法執行超出時限:"+timeout); } /** * 等待 */ private void waitFunc(long millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.2、建立一個BaseThread的抽象類,內建FTaskEndFlag執行緒同步標誌;
package com.zhaogang.ii.biz.threads.future; /** * 基礎執行緒 */ public abstract class BaseThread extends Thread { /** * futuretask 等待標誌 */ private FTaskEndFlag flag = new FTaskEndFlag(); public FTaskEndFlag getFlag() { return flag; } }
3.3、建立一個超時重試的工具類,對FutureTask的結果獲取設定超時時間;
package com.zhaogang.ii.biz.threads.future; import java.lang.reflect.Constructor; import java.util.List; import java.util.concurrent.*; /** * 方法超時重試工具 * 2018.09.20 by simm */ public class RetryUtil {/** * 預設方法(3秒超時,重試3次) * @param syncThread * @param params * @param <T> * @return * @throws Exception */ public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params) throws Exception { return execute(syncThread,params,3000,1000,3); } /** * 方法超時控制 * @param syncThread 執行緒類 * @param params 執行緒構造引數 * @param timeout * @param interval * @param retryTimes * @param <T> * @return * @throws Exception */ public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params, long timeout, long interval, int retryTimes) throws Exception { Boolean result = false; try{ //引數型別陣列 Class[] parameterTypeArrs = new Class[params.size()]; for(int i=0;i<params.size();i++){ Class c = params.get(i).getClass(); if(c.getName().indexOf("$$")>0){ String clsName = c.getName().substring(0,c.getName().indexOf("$$")); parameterTypeArrs[i] = Class.forName(clsName); }else{ parameterTypeArrs[i] = c; } } //根據引數型別獲取相應的建構函式 Constructor constructor= syncThread.getConstructor(parameterTypeArrs); //引數陣列 Object[] parameters= params.toArray(); //根據獲取的建構函式和引數,建立例項 BaseThread processor = (BaseThread) constructor.newInstance(parameters); processor.start(); //等待執行緒結束 processor.getFlag().waitForEnd(processor,timeout); boolean r = processor.getFlag().isNormaled(); if(!r){ throw processor.getFlag().getException(); } return processor.getFlag().isNormaled(); }catch (TimeoutException e) { //超時重試 retryTimes--; if(retryTimes > 0){ System.out.println("方法開始重試:"+retryTimes); Thread.sleep(interval); execute(syncThread,params,timeout,interval,retryTimes); }else{ throw e; } } return result; } }
3.4、設定子執行緒事務超時時間;
@Transactional(timeout = 3) public void syncProduct(ProductDetailinfo productInfo) {
四、給出一個呼叫程式碼。實現一個繼承自BaseFutureTask的 FutureTask任務。依舊需要注意子執行緒涉及到spring的元件,最好是引數從主執行緒注入到子執行緒。
package interrupt; import simm.framework.threadutils.interrupt.BaseThread; public class SyncProductThread extends BaseThread { private ProductBiz productBiz; private ProductDetailinfo productInfo; /** * 建構函式 * @param productBiz * @param productInfo */ public SyncProductThread(ProductBiz productBiz, ProductDetailinfo productInfo){ this.productBiz = productBiz; this.productInfo = productInfo; } @Override public void run() { boolean isNormaled = false; Exception exception = null; try{ productBiz.syncProduct(productInfo); isNormaled = true; }catch(Exception e){ e.printStackTrace(); exception = e; }finally { //通知子執行緒執行完畢了 super.getFlag().notifyEnd(isNormaled,exception); } } }
/** * 超時中斷的同步方法 * @param productInfo * @throws Exception */ public void syncProductTimeout(final ProductDetailinfo productInfo) throws Exception { Long timeout = 3000L; Long interval = 1000L; RetryUtil.execute(SyncProductThread.class,Arrays.asList(productBiz,productInfo),timeout,interval,3); //RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3); }
參考文章
https://www.jianshu.com/p/55221d045f39