Java任務排程執行緒池ScheduledThreadPoolExecutor原理解析
ScheduledThreadPoolExecutor是JDK在ThreadPoolExecutor的基礎上實現的任務排程執行緒池。
ScheduledThreadPoolExecutor的建構函式全部是呼叫父類(也就是ThreadPoolExecutor)的建構函式。其中,核心執行緒數是必須設定的,最大執行緒數是Integer.MAX_VALUE,空閒工作執行緒生存時間是0,阻塞佇列是DelayedWorkQueue。
DelayedWorkQueue內部使用一個初始容量為16的陣列來儲存任務,容量不夠時會擴容,所以可以任務DelayedWorkQueue是一個無界佇列,那麼最大執行緒數的設定也是沒有意義的。
//建構函式
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
既然ScheduledThreadPoolExecutor的建構函式全部使用父類的,那麼又是如何實現定時排程的呢?對比ScheduledThreadPoolExecutor與ThreadPoolExecutor,不同之處主要是下面兩點:
- 任務不同。ScheduledThreadPoolExecutor的任務統一被封裝成了ScheduledFutureTask
- 阻塞佇列不同。ScheduledThreadPoolExecutor使用的是DelayedWorkQueue,顧名思義,這是一個延時佇列。
我們以scheduleAtFixedRate()方法為例來看看具體是如何實現的。
scheduleAtFixedRate的大致邏輯如下:
- 將任務封裝成一個ScheduledFutureTask物件
- 將ScheduledFutureTask物件放到延時佇列中
/**
* 主要任務:
* 1.封裝一個ScheduledFutureTask物件
* 2.執行delayedExecute()方法
* /
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
/**
* 主要任務:
* 1.將task新增到佇列中
* /
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
所以,下面最重要的應該是延時佇列DelayedWorkQueue的offer和take方法了,來看看是怎麼實現的。
DelayedWorkQueue內部使用陣列去維護任務佇列的,那麼陣列是怎麼保證任務有序呢?
其實仔細看程式碼,我們能發現,這裡的實現是用一個二叉堆去對陣列元素進行排序。確切的說是小頂堆。那麼小頂堆是依據什麼來排序的呢?
因為ScheduledFutureTask實現了Comparable介面,是按照任務執行的時間來倒敘排序的。
//首先判斷容量,如果容量不夠就擴容,接著判斷是不是第一個元素,如果是,
//那麼直接放在index為0的位置,不是的話進行上濾操作。接下來判斷新增的元素是不是
//在堆頂,如果是那麼需要進行優先排程,那麼進行signal
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//擴容
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//根據任務的下一次執行時間比較,將最近需要執行的任務放到前面
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
//毫無疑問,take中直接獲取queue[0],它是距離目前最近的要被執行的任務,
//先檢測一下還有多長時間,任務會被執行,如果小於0,那麼立刻彈出,
//並且做一個下濾操作,重新找出堆頂元素。如果不小於0,那麼證明時間還沒到,
//那麼available.awaitNanos(delay);等到delay時間後自動喚醒,
//或者因為添加了一個更加緊急的任務即offer中的signal被呼叫了,那麼喚醒,
//重新迴圈獲取最優先執行的任務,如果delay小於0,那麼直接彈出任務。
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
//時間已到,取出
return finishPoll(first);
else if (leader != null)
//等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
弄清楚了延時的實現原理,下面最關鍵的就是週期排程的原理了。這個是在ScheduledFutureTask的run方法裡面實現的。
判斷是否是週期執行的,如果不是,直接執行,如果是,先執行,然後計算下一次執行時間,將任務重新新增到延時佇列中。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}