週期性執行緒池與主要原始碼解析
之前學習ThreadPool的使用以及原始碼剖析,並且從面試的角度去介紹知識點的解答。今天小強帶來週期性執行緒池的使用和重點原始碼剖析。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor:用來處理延時任務或定時任務
定時執行緒池類的類結構圖
ScheduledThreadPoolExecutor接收ScheduleFutureTask型別的任務,是執行緒池排程任務的最小單位。
它採用DelayQueue儲存等待的任務:
1、DelayQueue內部封裝成一個PriorityQueue,它會根據time的先後時間順序,如果time相同則根絕sequenceNumber排序;
2、DelayQueue是無界佇列;
ScheduleFutureTask
接收的引數:
private final long sequenceNumber;//任務的序號
private long time;//任務開始的時間
private final long period;//任務執行的時間間隔
工作執行緒的的執行過程:
工作執行緒會從DelayQueue取出已經到期的任務去執行;
執行結束後重新設定任務的到期時間,再次放回DelayQueue;
ScheduledThreadPoolExecutor會把待執行的任務放到工作佇列DelayQueue中,DelayQueue封裝了一個PriorityQueue,PriorityQueue會對佇列中的ScheduledFutureTask進行排序,具體的排序演算法實現如下:
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; //首先按照time排序,time小的排到前面,time大的排到後面 long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; //time相同,按照sequenceNumber排序; //sequenceNumber小的排在前面,sequenceNumber大的排在後面 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
接下來看看ScheduledFutureTask的run方法實現, run方法是排程task的核心,task的執行實際上是run方法的執行。
public void run() {
//是否是週期性的
boolean periodic = isPeriodic();
//執行緒池是shundown狀態不支援處理新任務,直接取消任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果不需要執行執行週期性任務,直接執行run方法結束
else if (!periodic)
ScheduledFutureTask.super.run();
//如果需要週期性執行,則在執行任務完成後,設定下一次執行時間
else if (ScheduledFutureTask.super.runAndReset()) {
//設定下一次執行該任務的時間
setNextRunTime();
//重複執行該任務
reExecutePeriodic(outerTask);
}
}
run方法的執行步驟:
- 1、如果執行緒池是shundown狀態不支援處理新任務,直接取消任務,否則步驟2;
- 2、如果不是週期性任務,直接呼叫ScheduledFutureTask的run方法執行,會設定執行結果,然後直接返回,否則步驟3;
- 3、如果是週期性任務,呼叫ScheduledFutureTask的runAndset方法執行,不會設定執行結果,然後直接返回,否則執行步驟4和步驟5;
- 4、計算下一次執行該任務的時間;
- 5、重複執行該任務;
接下來看下reExecutePeriodic方法的執行步驟:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
由於已經執行過一次週期性任務,所以不會reject當前任務,同時傳入的任務一定是週期性任務。
週期性執行緒池任務的提交方式
週期性有三種提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面從使用和原始碼兩個方面進行說明,首先是如果提交任務:
pool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("延遲執行");
}
},1, TimeUnit.SECONDS);
/**
* 這個執行週期是固定,不管任務執行多長時間,每過3秒中就會產生一個新的任務
*/
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//這個業務邏輯需要很長的時間,超過了3秒
System.out.println("重複執行");
}
},1,3,TimeUnit.SECONDS);
pool.shutdown();
/**
* 假如run方法30min後執行完成,然後間隔3秒,再週期性執行下一個任務
*/
pool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
//30min
System.out.println("重複執行");
}
},1,3,TimeUnit.SECONDS);
知道了如何提交週期性任務,接下來原始碼是如何執行的,首先是schedule方法,該方法是指任務在指定延遲時間到達後觸發,只會執行一次。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//把任務封裝成ScheduledFutureTask,之後呼叫decorateTask進行包裝;
//decorateTask方法是空方法,留給使用者去實現的;
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//包裝好任務之後,進行任務的提交
delayedExecute(t);
return t;
}
任務提交方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果執行緒池不是RUNNING狀態,則使用拒絕策略把提交任務拒絕掉
if (isShutdown())
reject(task);
else {
//與ThreadPoolExecutor不同,這裡直接把任務加入延遲佇列
super.getQueue().add(task);
//如果當前狀態無法執行任務,則取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//和ThreadPoolExecutor不一樣,corePoolSize沒有達到會增加Worker;
//增加Worker,確保提交的任務能夠被執行
ensurePrestart();
}
}
還沒關注我的公眾號?
- 掃文末二維碼關注公眾號【小強的進階之路】可領取如下:
- 學習資料: 1T視訊教程:涵蓋Javaweb前後端教學視訊、機器學習/人工智慧教學視訊、Linux系統教程視訊、雅思考試視訊教程;
- 100多本書:包含C/C++、Java、Python三門程式語言的經典必看圖書、LeetCode題解大全;
- 軟體工具:幾乎包括你在程式設計道路上的可能會用到的大部分軟體;
- 專案原始碼:20個JavaWeb專案原始碼。