1. 程式人生 > >利用Thread.stop方法完成方法執行超時中斷

利用Thread.stop方法完成方法執行超時中斷

一、業務場景:   系統中存在多種場景併發操作事務執行時互鎖的情況,導致任務積壓,系統崩潰。先做了各場景業務的效能調整,但是併發互鎖依然無法避免。於是開始考慮選取呼叫頻繁的同步功能作為死鎖的犧牲品,取消執行,釋放鎖。 二、處理方案:   由於FutureTask.cancel方案無法解決資料庫死鎖問題。因此接下來切換方案,改為Thread.stop實現,強行殺掉子執行緒。核心思想是 把需要監控的操作 包裝成一個子執行緒,啟動以後主執行緒開啟一個執行時間監控,當判斷到子執行緒已經超出時限,則呼叫stop方法,強行中斷(需要說明一點,stop操作可能會帶來一些未知的異常狀態,jdk已經將其標記為過時,不建議使用,這裡需要個人自行評估方案的影響)。    那麼只做一個stop子執行緒的操作,能同時解決掉其對資料庫的鎖定嗎?經過驗證這是不可能的,對資料庫的鎖定需要在事務提交或回滾時 釋放。事務是由spring框架自行接管處理的,在發生死鎖,子執行緒未結束的情況下,事務即使到了超時時限,但是由於沒有新的sql執行語句觸發其超時校驗,spring事務也無法自行結束。它的結束點為 資料庫事務鎖等待超時。因此,現在我們想要讓事務提前回滾還需要一個操作,設定 @Transactional的timeout時間,指定為一個較短的時間,子執行緒被stop後,spring會自動對該事務進行回滾,從而達到資料庫解鎖的目的。

三、程式碼實現:

  3.1、建立一個FTaskEndFlag的執行緒同步標誌。父執行緒等待子執行緒反饋執行結果後,再執行後續的邏輯;

package simm.framework.threadutils.interrupt;

import java.util.concurrent.TimeoutException;
/**
 * futuretask執行終止事件通知
 * 2018.09.22 by simm
 */
public class FTaskEndFlag {
    private volatile boolean isNormaled = false;
    private volatile boolean
fired = false; private Exception exception =null; public boolean isNormaled() { return isNormaled; } /** * 獲取子執行緒異常資訊 * @return */ public Exception getException() { return exception; } /** * 通知結束 * @param result * @param result
*/ public synchronized void notifyEnd(boolean result){ isNormaled = result; fired = true; notifyAll(); } /** * 通知結束 * @param result * @param result */ public synchronized void notifyEnd(boolean result,Exception ex){ isNormaled = result; exception = ex; fired = true; notifyAll(); } /** * 執行結束通知 */ public synchronized void waitForEnd() throws InterruptedException { while (!fired) { //子執行緒掛起,釋放synchronized同步塊 wait(); } } /** * 執行結束通知 */ public void waitForEnd(Thread thread,Long timeout) throws TimeoutException { long begin = System.currentTimeMillis(); while(System.currentTimeMillis()-begin <= timeout){ waitFunc(10); //子執行緒已經執行完畢,則返回 if(fired) return; } //超時未返回,則終止執行緒 try{ thread.stop(); }catch(Exception e){ e.printStackTrace(); throw e; } throw new TimeoutException("方法執行超出時限:"+timeout); } /** * 等待 */ private void waitFunc(long millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }

  3.2、建立一個BaseThread的抽象類,內建FTaskEndFlag執行緒同步標誌;

package com.zhaogang.ii.biz.threads.future;

/**
 * 基礎執行緒
 */
public abstract class BaseThread extends Thread {
    /**
     * futuretask 等待標誌
     */
    private FTaskEndFlag flag = new FTaskEndFlag();

    public FTaskEndFlag getFlag() {
        return flag;
    }
}

  3.3、建立一個超時重試的工具類,對FutureTask的結果獲取設定超時時間;

package com.zhaogang.ii.biz.threads.future;

import java.lang.reflect.Constructor;
import java.util.List;
import java.util.concurrent.*;
/**
 * 方法超時重試工具
 * 2018.09.20  by simm
 */
public class RetryUtil {/**
     * 預設方法(3秒超時,重試3次)
     * @param syncThread
     * @param params
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params) throws Exception {
        return execute(syncThread,params,3000,1000,3);
    }

    /**
     * 方法超時控制
     * @param syncThread 執行緒類
     * @param params 執行緒構造引數
     * @param timeout
     * @param interval
     * @param retryTimes
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T extends BaseThread> Boolean execute(Class<T> syncThread, List<Object> params, long timeout, long interval, int retryTimes) throws Exception {
        Boolean result = false;
        try{
            //引數型別陣列
            Class[] parameterTypeArrs = new Class[params.size()];
            for(int i=0;i<params.size();i++){
                Class c =  params.get(i).getClass();
                if(c.getName().indexOf("$$")>0){
                    String clsName = c.getName().substring(0,c.getName().indexOf("$$"));
                    parameterTypeArrs[i] = Class.forName(clsName);
                }else{
                    parameterTypeArrs[i] = c;
                }
            }
            //根據引數型別獲取相應的建構函式
            Constructor constructor= syncThread.getConstructor(parameterTypeArrs);
            //引數陣列
            Object[] parameters= params.toArray();
            //根據獲取的建構函式和引數,建立例項
            BaseThread processor = (BaseThread) constructor.newInstance(parameters);
            processor.start();
            //等待執行緒結束
            processor.getFlag().waitForEnd(processor,timeout);
            boolean r = processor.getFlag().isNormaled();
            if(!r){
                throw processor.getFlag().getException();
            }
            return processor.getFlag().isNormaled();
        }catch (TimeoutException e) {
            //超時重試
            retryTimes--;
            if(retryTimes > 0){
                System.out.println("方法開始重試:"+retryTimes);
                Thread.sleep(interval);
                execute(syncThread,params,timeout,interval,retryTimes);
            }else{
                throw e;
            }
        }
        return result;
    }
}

  3.4、設定子執行緒事務超時時間;

    @Transactional(timeout = 3)
    public void syncProduct(ProductDetailinfo productInfo) {

四、給出一個呼叫程式碼。實現一個繼承自BaseFutureTask的 FutureTask任務。依舊需要注意子執行緒涉及到spring的元件,最好是引數從主執行緒注入到子執行緒。

package interrupt;

import simm.framework.threadutils.interrupt.BaseThread;

public class SyncProductThread extends BaseThread {
    private ProductBiz productBiz;
    private ProductDetailinfo productInfo;
    /**
     * 建構函式
     * @param productBiz
     * @param productInfo
     */
    public SyncProductThread(ProductBiz productBiz, ProductDetailinfo productInfo){
        this.productBiz = productBiz;
        this.productInfo = productInfo;
    }
    @Override
    public void run() {
        boolean isNormaled = false;
        Exception exception = null;
        try{
            productBiz.syncProduct(productInfo);
            isNormaled = true;
        }catch(Exception e){
            e.printStackTrace();
            exception = e;
        }finally {
            //通知子執行緒執行完畢了
            super.getFlag().notifyEnd(isNormaled,exception);
        }
    }
}
/**
     * 超時中斷的同步方法
     * @param productInfo
     * @throws Exception
     */
    public void syncProductTimeout(final ProductDetailinfo productInfo) throws Exception {
        Long timeout = 3000L;
        Long interval = 1000L;
        RetryUtil.execute(SyncProductThread.class,Arrays.asList(productBiz,productInfo),timeout,interval,3);
        //RetryUtil.execute(new SyncProductTask(productBiz,productInfo),timeout,interval,3);
    }

參考文章

https://www.jianshu.com/p/55221d045f39