Java併發程式設計筆記——J.U.C之executors框架:ScheduledThreadPoolExecutor
一、ScheduledThreadPoolExecutor簡介
在executors框架設計理念 一節中,我們曾經提到過一種可對任務進行延遲/週期性排程的執行器(Executor),這類Executor一般實現了ScheduledExecutorService這個介面。ScheduledExecutorService在普通執行器介面(ExecutorService)的基礎上引入了Future模式,使得可以限時或週期性地排程任務。
ScheduledThreadPoolExecutor
的類繼承關係如下圖,該圖中除了本節要講解的ScheduledThreadPoolExecutor外,其它部分已經在前2節詳細介紹過了:
從上圖中可以看到,ScheduledThreadPoolExecutor其實是繼承了ThreadPoolExecutor這個普通執行緒池,我們知道ThreadPoolExecutor中提交的任務都是實現了Runnable介面,但是ScheduledThreadPoolExecutor比較特殊,由於要滿足任務的延遲/週期排程功能,它會對所有的Runnable任務都進行包裝,包裝成一個RunnableScheduledFuture
任務。
RunnableScheduledFuture是Future模式中的一個介面,關於Future模式,我們後續會專門章節講解,這裡只要知道RunnableScheduledFuture的作用就是可以非同步地執行【延時/週期任務】。
另外,我們知道在ThreadPoolExecutor中,需要指定一個阻塞佇列作為任務佇列。ScheduledThreadPoolExecutor中也一樣,不過特殊的是,ScheduledThreadPoolExecutor中的任務佇列是一種特殊的延時佇列(DelayQueue)。
我們曾經在juc-collections框架中,分析過該種阻塞佇列,DelayQueue底層基於優先佇列(PriorityQueue)實現,是一種“堆”結構,通過該種阻塞佇列可以實現任務的延遲到期執行(即每次從佇列獲取的任務都是最先到期的任務)。
ScheduledThreadPoolExecutor在內部定義了DelayQueue的變種——DelayedWorkQueue
二、ScheduledThreadPoolExecutor基本原理
構造執行緒池
我們先來看下ScheduledThreadPoolExecutor的構造,其實在executors框架概述中講Executors時已經接觸過了,Executors使用newScheduledThreadPool
工廠方法建立ScheduledThreadPoolExecutor:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
我們來看下ScheduledThreadPoolExecutor的構造器,內部其實都是呼叫了父類ThreadPoolExecutor的構造器,這裡最需要注意的就是任務佇列的選擇——DelayedWorkQueue,我們後面會詳細介紹它的實現原理。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}
執行緒池的排程
ScheduledThreadPoolExecutor的核心排程方法是schedule
、scheduleAtFixedRate
、scheduleWithFixedDelay
,我們通過schedule方法來看下整個排程流程:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
上述的decorateTask方法把Runnable任務包裝成ScheduledFutureTask,使用者可以根據自己的需要覆寫該方法:
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
注意:
ScheduledFutureTask是RunnableScheduledFuture介面的實現類,任務通過
period
欄位來表示任務型別
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
/**
* 任務序號, 自增唯一
*/
private final long sequenceNumber;
/**
* 首次執行的時間點
*/
private long time;
/**
* 0: 非週期任務
* >0: fixed-rate任務
* <0: fixed-delay任務
*/
private final long period;
/**
* 在堆中的索引
*/
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// ...
}
ScheduledThreadPoolExecutor中的任務佇列—— DelayedWorkQueue,儲存的元素就是ScheduledFutureTask。DelayedWorkQueue是一種 堆結構,time最小的任務會排在堆頂(表示最早過期),每次出隊都是取堆頂元素,這樣最快到期的任務就會被先執行。如果兩個ScheduledFutureTask的time相同,就比較它們的序號——sequenceNumber,序號小的代表先被提交,所以就會先執行。
schedule的核心是其中的delayedExecute方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) // 執行緒池已關閉
reject(task); // 任務拒絕策略
else {
super.getQueue().add(task); // 將任務入隊
// 如果執行緒池已關閉且該任務是非週期任務, 則將其從佇列移除
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false); // 取消任務
else
ensurePrestart(); // 新增一個工作執行緒
}
}
通過delayedExecute可以看出,ScheduledThreadPoolExecutor的整個任務排程流程大致如下圖:
我們來分析這個過程:
- 首先,任務被提交到執行緒池後,會判斷執行緒池的狀態,如果不是RUNNING狀態會執行拒絕策略。
- 然後,將任務新增到阻塞佇列中。(注意,由於DelayedWorkQueue是無界佇列,所以一定會add成功)
-
然後,會建立一個工作執行緒,加入到核心執行緒池或者非核心執行緒池:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
通過ensurePrestart可以看到,如果核心執行緒池未滿,則新建的工作執行緒會被放到核心執行緒池中。如果核心執行緒池已經滿了,ScheduledThreadPoolExecutor不會像ThreadPoolExecutor那樣再去建立歸屬於非核心執行緒池的工作執行緒,而是直接返回。也就是說,在ScheduledThreadPoolExecutor中,一旦核心執行緒池滿了,就不會再去建立工作執行緒。
這裡思考一點,什麼時候會執行else if (wc == 0)建立一個歸屬於非核心執行緒池的工作執行緒?
答案是,當通過setCorePoolSize方法設定核心執行緒池大小為0時,這裡必須要保證任務能夠被執行,所以會建立一個工作執行緒,放到非核心執行緒池中。
最後,執行緒池中的工作執行緒會去任務佇列獲取任務並執行,當任務被執行完成後,如果該任務是週期任務,則會重置time欄位,並重新插入佇列中,等待下次執行。這裡注意從佇列中獲取元素的方法:
- 對於核心執行緒池中的工作執行緒來說,如果沒有超時設定(
allowCoreThreadTimeOut == false
),則會使用阻塞方法take獲取任務(因為沒有超時限制,所以會一直等待直到佇列中有任務);如果設定了超時,則會使用poll方法(方法入參需要超時時間),超時還沒拿到任務的話,該工作執行緒就會被回收。 - 對於非工作執行緒來說,都是呼叫poll獲取佇列元素,超時取不到任務就會被回收。
上述就是ScheduledThreadPoolExecutor的核心排程流程,通過我們的分析可以看出,相比ThreadPoolExecutor,ScheduledThreadPoolExecutor主要有以下幾點不同:
- 總體的排程控制流程略有區別;
- 任務的執行方式有所區別;
- 任務佇列的選擇不同。
最後,我們來看下ScheduledThreadPoolExecutor中的延時佇列——DelayedWorkQueue。
延時佇列
DelayedWorkQueue,該佇列和已經介紹過的DelayQueue區別不大,只不過佇列元素是RunnableScheduledFuture:
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private int size = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
private Thread leader = null;
// ...
}
DelayedWorkQueue是一個無界佇列,在佇列元素滿了以後會自動擴容,它並沒有像DelayQueue那樣,將佇列操作委託給PriorityQueue,而是自己重新實現了一遍堆的核心操作——上浮、下沉。我這裡不再贅述這些堆操作,讀者可以參考PriorityBlockingQueue自行閱讀原始碼。
我們關鍵來看下add
、take
、poll
這三個佇列方法,因為ScheduledThreadPoolExecutor的核心排程流程中使用到了這三個方法:
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
add、offer內部都呼叫了下面這個方法:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size; // 佇列已滿, 擴容
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e); // 堆上浮操作
}
if (queue[0] == e) { // 當前元素是首個元素
leader = null;
available.signal(); // 喚醒一個等待執行緒
}
} finally {
lock.unlock();
}
return true;
}
take方法:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (; ; ) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) // 佇列為空
available.await(); // 等待元素入隊
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 元素已到期
return finishPoll(first);
// 執行到此處, 說明隊首元素還未到期
first = null;
if (leader != null)
available.await();
else {
// 當前執行緒成功leader執行緒
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
注意:上述leader表示一個等待獲取隊首元素的出隊執行緒,這是一種稱為“Leader-Follower pattern”的多執行緒設計模式(讀者可以參考DelayQueue中的講解)。
每次出隊元素時,如果佇列為空或者隊首元素還未到期,執行緒就會在condition條件佇列等待。一般的思路是無限等待,直到出現一個入隊執行緒,入隊元素後將一個出隊執行緒喚醒。
為了提升效能,當佇列非空時,用leader
儲存第一個到來並嘗試出隊的執行緒,並設定它的等待時間為隊首元素的剩餘期限,這樣當元素過期後,執行緒也就自己喚醒了,不需要入隊執行緒喚醒。這樣做的好處就是提升一些效能。
三、總結
本節介紹了ScheduledThreadPoolExecutor,它是對普通執行緒池ThreadPoolExecutor的擴充套件,增加了延時排程、週期排程任務的功能。概括下ScheduledThreadPoolExecutor的主要特點:
- 對Runnable任務進行包裝,封裝成
ScheduledFutureTask
,該類任務支援任務的週期執行、延遲執行; - 採用
DelayedWorkQueue
作為任務佇列。該佇列是無界佇列,所以任務一定能新增成功,但是當工作執行緒嘗試從佇列取任務執行時,只有最先到期的任務會出隊,如果沒有任務或者隊首任務未到期,則工作執行緒會阻塞; ScheduledThreadPoolExecutor
的任務排程流程與ThreadPoolExecutor略有區別,最大的區別就是,先往佇列新增任務,然後建立工作執行緒執行任務。
另外,maximumPoolSize
這個引數對ScheduledThreadPoolExecutor其實並沒有作用,因為除非把corePoolSize設定為0,這種情況下ScheduledThreadPoolExecutor只會建立一個屬於非核心執行緒池的工作執行緒;否則,ScheduledThreadPoolExecutor只會新建歸屬於核心執行緒池的工作執行緒,一旦核心執行緒池滿了,就不再新