Eureka系列(六) TimedSupervisorTask類解析
為什麼要單獨講解TimedSupervisorTask這個類呢?因為這個類在我們DiscoveryClient類的initScheduledTasks方法進行定時任務初始化時被使用得比較多,所以我們需要了解下這個類,我們先看下TimedSupervisorTask這個類在initScheduledTasks的具體使用:
private final ScheduledExecutorService scheduler; private void initScheduledTasks() { …省略其他程式碼 // 初始化定時拉取服務註冊資訊 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); …省略其他程式碼 // 初始化定時服務續約任務 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); …省略其他程式碼 }
由此可見,TimedSupervisorTask類被使用在了定時任務的初始化中,我們具體來看看這個類的結構:
public class TimedSupervisorTask extends TimerTask { private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class); private final Counter timeoutCounter; private final Counter rejectedCounter; private final Counter throwableCounter; private final LongGauge threadPoolLevelGauge; private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor executor; private final long timeoutMillis; private final Runnable task; private final AtomicLong delay; private final long maxDelay; public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); } @Override public void run() { Future<?> future = null; try { future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); // 如果出現異常,則將時間*2,然後取 定時時間 和 最長定時時間中最小的為下次任務執行的延時時間 long newDelay = Math.min(maxDelay, currentDelay * 2); delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } } }
我們可以仔細看看run方法的具體實現,因為這裡有一個值得借鑑的設計思路!!!
我們簡單來看看這個方法具體執行流程:
1.執行submit()方法提交任務
2.執行future.get()方法,如果沒有在規定的時間得到返回值或者任務出現異常,則進入異常處理catch程式碼塊。
3.如果發生異常
a. 發生TimeoutException異常,則執行Math.min(maxDelay, currentDelay ✖️ 2);得到任務延時時間 ✖️ 2 和 最大延時時間的最小值,然後改變任務的延時時間timeoutMillis(延時任務時間預設值是30s)
b.發生RejectedExecutionException異常,則將rejectedCounter值+1
c.發生Throwable異常,則將throwableCounter值+1
4.如果沒有發生異常,則再設定一次延時任務時間timeoutMillis
5.進入finally程式碼塊
a.如果future不為null,則執行future.cancel(true),中斷執行緒停止任務
b.如果執行緒池沒有shutdown,則建立一個新的定時任務
\(\color{red}{注意}\):不知道有沒有小夥伴發現,不管我們的定時任務執行是成功還是結束(如果還沒有執行結束,也會被中斷),然後會再重新初始化一個新的任務。並且這個任務的延時時間還會因為不同的情況受到改變,在try程式碼塊中如果不發現異常,則會重新初始化延時時間,如果發生TimeoutException異常,則會更改延時時間,更改為 任務延時時間 ✖️ 2 和 最大延時時間的最小值。所以我們會發現這樣的設計會讓整個延時任務很靈活。如果不發生異常,則延時時間不會變;如果發現異常,則增長延時時間;如果程式又恢復正常了,則延時時間又恢復成了預設值。
總結:我們在設計延時/週期性任務時就可以參考TimedSupervisorTask的實現,程式一旦遇到發生超時異常,就將間隔時間調大,如果連續超時,那麼每次間隔時間都會增大一倍,一直到達外部引數設定的上限為止,一旦新任務不再發生超時異常,間隔時間又會自動恢復為初始值。