Java8之ScheduledThreadPoolExecutor實現原理
ScheduledThreadPoolExecutor是一個可實現定時任務的執行緒池,ScheduledThreadPoolExecutor內的任務既可以在設定的時間到達時執行一次,也可以相隔固定時間週期執行。
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,關於ThreadPoolExecutor的原理可參考:
ScheduledThreadPoolExecutor相比較於ThreadPoolExecutor,只多了四個提交任務的方法,這四個方法實現自ScheduledExecutorService,繼承自ThreadPoolExecutor的execute和submit方法都呼叫了自己的schedule方法
- 第一個schedule的任務是runnable,不支援獲取返回值,會在指定延遲時間之後執行一次
- 第二個schedule的任務是callable,可獲取返回值,會在指定延遲時間之後執行一次
- scheduleAtFixedRate的任務為runnable,不支援獲取返回值,會在指定延遲時間後執行一次,之後按設定好的間隔時間重複執行
- scheduleWithFixedDelay的任務為runnable,不支援獲取返回值,會在指定延遲時間後執行一次,並在執行完成時間的基礎上按間隔時間週期執行
後兩個方法可能理解起來不是很清楚,簡單點說,scheduleAtFixedRate的下一次任務的執行時間為:法定的任務開始時間(並不是任務真正開始執行的時間,因為執行緒池是在任務執行完成後再計算下一次的開始時間,此時計算出的開始時間可能已經小於現在的時間了,當這個任務執行的時候,真實的執行時間肯定比設定好的時間大)
我們將scheduleAtFixedRate演示一下,正常情況如下,其中0、5、15、20就是法定的任務開始時間
不正常的情況如下
實現原理
ScheduledThreadPoolExecutor之所以能夠實現上述的定時功能,本質上是通過兩點實現的:
1.將任務包裝在ScheduledFutureTask內。ScheduledFutureTask相當於原始任務的代理,在執行原始任務之後會視情況將任務的執行時間修改後再加入工作佇列
2.使用了DelayedWorkQueue作為任務佇列。DelayedWorkQueue內部是一個ScheduledFutureTask的陣列,queue的功能類似於DelayQueue,在設定的時間到達時才能取出佇列中的元素。
下面我們基於以上兩點來分析原始碼。
1.構造方法
以最後一個建構函式為例,四個構造方法都呼叫了父類(ThreadPoolExecutor)的構造方法,最大執行緒數都設定為int最大值,執行緒存活時間都為0,工作佇列都為DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//最大執行緒數為int最大值,執行緒空轉時間為0,使用DelayedWorkQueue作為工作佇列
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
2.任務提交
普通的schedule方法會將間隔時間設為0,只執行一次;scheduleAtFixedRate會將時間間隔設為正數;scheduleWithFixedDelay會將時間間隔設為負數
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//將任務包裝成ScheduledFutureTask。
//ScheduledFutureTask的構造方法內會將間隔時間設為0
//ScheduledFutureTask的time會設為計算出的任務執行時間
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//判斷執行緒池狀態,一切正常會將任務加入工作佇列
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
//將任務包裝成ScheduledFutureTask。
//ScheduledFutureTask的構造方法內會將間隔時間設為0
//ScheduledFutureTask的time會設為計算出的任務執行時間
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
//判斷執行緒池狀態,一切正常會將任務加入工作佇列
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//將任務包裝成ScheduledFutureTask。
//間隔時間設為相應的時間單位的值,值為正數
//ScheduledFutureTask的time會設為計算出的任務執行時間
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//將任務指向自己,將任務再放回佇列時會用到該值
sft.outerTask = t;
//判斷執行緒池狀態,一切正常會將任務加入工作佇列
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//將任務包裝成ScheduledFutureTask。
//間隔時間設為相應的時間單位的值,值為負數
//ScheduledFutureTask的time會設為計算出的任務執行時間
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, set);
//將任務指向自己,將任務再放回佇列時會用到該值
sft.outerTask = t;
//判斷執行緒池狀態,一切正常會將任務加入工作佇列
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//先判斷執行緒池是否停止
if (isShutdown())
//若已停止,會根據拒絕策略執行拒絕操作
reject(task);
else {
//將任務加入工作佇列
super.getQueue().add(task);
//再次檢驗執行緒池是否停止,若已停止,將任務取消,否則執行一些執行緒池的預熱操作
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
3.ScheduledFutureTask
1.引數
- sequenceNumber用於當兩個任務時間相同時,sequenceNumber小的排在工作佇列的前邊
- time記錄了任務的開始時間
- period記錄了任務的執行間隔時間
- outerTask是一個指向自己的引用,當任務要重複執行時,就是將這個引用再放入工作佇列
- heapIndex記錄了任務在佇列裡的座標,方便快速取消任務
2.run方法
ScheduledFutureTask是一個runnable,run是他的核心方法
public void run() {
//根據間隔時間判斷是不是需要多次執行的任務
boolean periodic = isPeriodic();
//判斷當前任務線上程池SHUTDOWN後需不需要執行
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果是一次性的任務,執行該任務
else if (!periodic)
ScheduledFutureTask.super.run();
//如果是需要多次執行的任務,執行該任務
else if (ScheduledFutureTask.super.runAndReset()) {
//設定該任務下次執行的時間
setNextRunTime();
//將修改好時間的任務再次放入工作佇列
reExecutePeriodic(outerTask);
}
}
isPeriodic方法很簡單, 根據間隔時間判斷
public boolean isPeriodic() {
//如果間隔時間為0,則是一次性任務
return period != 0;
}
setNextRunTime是設定下一次執行時間的方法
private void setNextRunTime() {
//提交任務時,如果使用的scheduleWithFixedDelay方法,會將間隔時間儲存為負數
long p = period;
//如果間隔時間>0,將下次執行時間設定為:法定執行時間+執行間隔
if (p > 0)
time += p;
else
//將下次執行時間設定為:當前時間+執行間隔
time = triggerTime(-p);
}
ScheduledThreadPoolExecutor的reExecutePeriodic方法將任務重新放回佇列
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//判斷執行緒池當前狀態能否繼續執行任務
if (canRunInCurrentRunState(true)) {
//將任務加入佇列,DelayedWorkQueue會按時間和序列號將任務插到合適的位置
super.getQueue().add(task);
//再次判斷執行緒池狀態,防止狀態已修改
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
//此方法會將工作執行緒數量補充到設定的核心執行緒數量
//如果設定的核心執行緒數為0,則會新增一個普通工作執行緒
ensurePrestart();
}
}
在將ScheduledFutureTask放入任務佇列後,佇列會根據ScheduledFutureTask的compareTo方法將任務排序
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
//先比較時間,時間靠前的小,如果時間相同,再比較序列號,序列號小的排在前邊
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
4.DelayedWorkQueue
該阻塞佇列是使用陣列儲存任務
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
存方法,存入的任務在任務佇列內只能保證一個粗略有序
//呼叫執行緒池的提交任務方法時內部呼叫的就是佇列的add方法
public boolean add(Runnable e) {
//add方法將任務委託給offer方法
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
//事物類操作首先要獲取到鎖
lock.lock();
try {
int i = size;
//如果佇列儲存的資料達到了佇列的容量,將佇列進行擴容
//擴容後容量 = 擴容前容量 + 擴容前容量*0.5
//如果擴容後容量超過int最大值,則將容量設定為int最大值
if (i >= queue.length)
grow();
size = i + 1;
//如果i==0,說明這是第一個任務,直接放在下標0的位置
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//否則的話,採用二分查詢法查詢一個合適的位置
//採用二分查詢插入不能保證絕對的有序
siftUp(i, e);
}
//如果是第一個任務,則喚醒消費者執行緒
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
//二分插入方法
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//右移1位,找到k中間的下標
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
//如果要插入的任務大於等於下標位置,則跳出迴圈,任務會插在k的位置
if (key.compareTo(e) >= 0)
break;
//如果要插入的任務小於下標位置,則將該下標內的位置換到k,並將k賦值為中間下標
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
取方法分三種,poll獲取不到立刻返回null,take會阻塞獲取,poll會在超時時間內阻塞獲取,以take為例
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//事物類操作都要先獲取鎖,此處加的是可響應中斷的鎖
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
//如果佇列內沒有任務,則在lock的等待佇列上等待
if (first == null)
available.await();
else {
//獲取佇列上第一個任務還需等待的時間
long delay = first.getDelay(NANOSECONDS);
//如果第一個任務的執行時間已經到了,則取出該任務
if (delay <= 0)
return finishPoll(first);
first = null;
//第一個消費者會設為leader,其餘的消費者會阻塞
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//第一個消費者會在等待佇列上阻塞delay長的時間
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}