1. 程式人生 > 實用技巧 >Eureka系列(六) TimedSupervisorTask類解析

Eureka系列(六) TimedSupervisorTask類解析

  為什麼要單獨講解TimedSupervisorTask這個類呢?因為這個類在我們DiscoveryClient類的initScheduledTasks方法進行定時任務初始化時被使用得比較多,所以我們需要了解下這個類,我們先看下TimedSupervisorTask這個類在initScheduledTasks的具體使用:

private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
    …省略其他程式碼
    // 初始化定時拉取服務註冊資訊
    scheduler.schedule(
        new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        ),
        registryFetchIntervalSeconds, TimeUnit.SECONDS);

    …省略其他程式碼
    // 初始化定時服務續約任務
    scheduler.schedule(
        new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        ),
       renewalIntervalInSecs, TimeUnit.SECONDS);
       …省略其他程式碼
}

  由此可見,TimedSupervisorTask類被使用在了定時任務的初始化中,我們具體來看看這個類的結構:

public class TimedSupervisorTask extends TimerTask {
    private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class);

    private final Counter timeoutCounter;
    private final Counter rejectedCounter;
    private final Counter throwableCounter;
    private final LongGauge threadPoolLevelGauge;

    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor executor;
    private final long timeoutMillis;
    private final Runnable task;

    private final AtomicLong delay;
    private final long maxDelay;

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;

        // Initialize the counters and register.
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }
    @Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            long currentDelay = delay.get();
            // 如果出現異常,則將時間*2,然後取 定時時間 和 最長定時時間中最小的為下次任務執行的延時時間
            long newDelay = Math.min(maxDelay, currentDelay * 2);  
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }
            throwableCounter.increment();
        } finally {
            if (future != null) {
                future.cancel(true);
            }
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

  我們可以仔細看看run方法的具體實現,因為這裡有一個值得借鑑的設計思路!!!

  我們簡單來看看這個方法具體執行流程:
    1.執行submit()方法提交任務
    2.執行future.get()方法,如果沒有在規定的時間得到返回值或者任務出現異常,則進入異常處理catch程式碼塊。
    3.如果發生異常
      a. 發生TimeoutException異常,則執行Math.min(maxDelay, currentDelay ✖️ 2);得到任務延時時間 ✖️ 2 和 最大延時時間的最小值,然後改變任務的延時時間timeoutMillis(延時任務時間預設值是30s)
      b.發生RejectedExecutionException異常,則將rejectedCounter值+1
      c.發生Throwable異常,則將throwableCounter值+1
    4.如果沒有發生異常,則再設定一次延時任務時間timeoutMillis
    5.進入finally程式碼塊
      a.如果future不為null,則執行future.cancel(true),中斷執行緒停止任務
      b.如果執行緒池沒有shutdown,則建立一個新的定時任務

\(\color{red}{注意}\):不知道有沒有小夥伴發現,不管我們的定時任務執行是成功還是結束(如果還沒有執行結束,也會被中斷),然後會再重新初始化一個新的任務。並且這個任務的延時時間還會因為不同的情況受到改變,在try程式碼塊中如果不發現異常,則會重新初始化延時時間,如果發生TimeoutException異常,則會更改延時時間,更改為 任務延時時間 ✖️ 2 和 最大延時時間的最小值。所以我們會發現這樣的設計會讓整個延時任務很靈活。如果不發生異常,則延時時間不會變;如果發現異常,則增長延時時間;如果程式又恢復正常了,則延時時間又恢復成了預設值。

總結:我們在設計延時/週期性任務時就可以參考TimedSupervisorTask的實現,程式一旦遇到發生超時異常,就將間隔時間調大,如果連續超時,那麼每次間隔時間都會增大一倍,一直到達外部引數設定的上限為止,一旦新任務不再發生超時異常,間隔時間又會自動恢復為初始值。