ScheduledThreadPoolExecutor詳解
本文主要分為兩個部分,第一部分首先會對ScheduledThreadPoolExecutor進行簡單的介紹,並且會介紹其主要API的使用方式,然後介紹了其使用時的註意點,第二部分則主要對ScheduledThreadPoolExecutor的實現細節進行介紹。
1. 使用簡介
ScheduledThreadPoolExecutor是一個使用線程池執行定時任務的類,相較於Java中提供的另一個執行定時任務的類Timer,其主要有如下兩個優點:
- 使用多線程執行任務,不用擔心任務執行時間過長而導致任務相互阻塞的情況,Timer是單線程執行的,因而會出現這個問題;
- 不用擔心任務執行過程中,如果線程失活,其會新建線程執行任務,Timer類的單線程掛掉之後是不會重新創建線程執行後續任務的。
除去上述兩個優點外,ScheduledThreadPoolExecutor還提供了非常靈活的API,用於執行任務。其任務的執行策略主要分為兩大類:①在一定延遲之後只執行一次某個任務;②在一定延遲之後周期性的執行某個任務。如下是其主要API:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit);
上述四個方法中,第一個和第二個方法屬於第一類,即在delay指定的延遲之後執行第一個參數所指定的任務,區別在於,第二個方法執行之後會有返回值,而第一個方法執行之後是沒有返回值的。第三個和第四個方法則屬於第二類,即在第二個參數(initialDelay)指定的時間之後開始周期性的執行任務,執行周期間隔為第三個參數指定的時間,但是這兩個方法的區別在於第三個方法執行任務的間隔是固定的,無論上一個任務是否執行完成,而第四個方法的執行時間間隔是不固定的,其會在周期任務的上一個任務執行完成之後才開始計時,並在指定時間間隔之後才開始執行任務。如下是使用scheduleWithFixedDelay()和scheduleAtFixedRate()方法編寫的測試用例:
public class ScheduledThreadPoolExecutorTest {
private ScheduledThreadPoolExecutor executor;
private Runnable task;
@Before
public void before() {
executor = initExecutor();
task = initTask();
}
private ScheduledThreadPoolExecutor initExecutor() {
return new ScheduledThreadPoolExecutor(2);;
}
private Runnable initTask() {
long start = System.currentTimeMillis();
return () -> {
print("start task: " + getPeriod(start, System.currentTimeMillis()));
sleep(SECONDS, 10);
print("end task: " + getPeriod(start, System.currentTimeMillis()));
};
}
@Test
public void testFixedTask() {
print("start main thread");
executor.scheduleAtFixedRate(task, 15, 30, SECONDS);
sleep(SECONDS, 120);
print("end main thread");
}
@Test
public void testDelayedTask() {
print("start main thread");
executor.scheduleWithFixedDelay(task, 15, 30, SECONDS);
sleep(SECONDS, 120);
print("end main thread");
}
private void sleep(TimeUnit unit, long time) {
try {
unit.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private int getPeriod(long start, long end) {
return (int)(end - start) / 1000;
}
private void print(String msg) {
System.out.println(msg);
}
}
可以看到,上述兩個測試用例代碼塊基本是一致的,區別在於第一個用例調用的是scheduleAtFixedRate()方法,而第二個用例調用的是scheduleWithFixedDelay()。這裏兩個用例都是設置的在延遲15s後每個30s執行一次指定的任務,而該任務執行時長為10s。如下分別是這兩個測試用例的執行結果:
start main thread
start task: 15
end task: 25
start task: 45
end task: 55
start task: 75
end task: 85
start task: 105
end task: 115
end main thread
start main thread
start task: 15
end task: 25
start task: 55
end task: 65
start task: 95
end task: 105
end main thread
對比上述執行結果可以看出,對於scheduleAtFixedRate()方法,其每次執行任務的開始時間間隔都為固定不變的30s,與任務執行時長無關,而對於scheduleWithFixedDelay()方法,其每次執行任務的開始時間間隔都為上次任務執行時間加上指定的時間間隔。
這裏關於ScheduledThreadPoolExecutor的使用有三點需要說明如下:
- ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor(ThreadPoolExecutor詳解),因而也有繼承而來的execute()和submit()方法,但是ScheduledThreadPoolExecutor重寫了這兩個方法,重寫的方式是直接創建兩個立即執行並且只執行一次的任務;
- ScheduledThreadPoolExecutor使用ScheduledFutureTask封裝每個需要執行的任務,而任務都是放入DelayedWorkQueue隊列中的,該隊列是一個使用數組實現的優先隊列,在調用ScheduledFutureTask::cancel()方法時,其會根據removeOnCancel變量的設置來確認是否需要將當前任務真正的從隊列中移除,而不只是標識其為已刪除狀態;
- ScheduledThreadPoolExecutor提供了一個鉤子方法decorateTask(Runnable, RunnableScheduledFuture)用於對執行的任務進行裝飾,該方法第一個參數是調用方傳入的任務實例,第二個參數則是使用ScheduledFutureTask對用戶傳入任務實例進行封裝之後的實例。這裏需要註意的是,在ScheduledFutureTask對象中有一個heapIndex變量,該變量用於記錄當前實例處於隊列數組中的下標位置,該變量可以將諸如contains(),remove()等方法的時間復雜度從O(N)降低到O(logN),因而效率提升是比較高的,但是如果這裏用戶重寫decorateTask()方法封裝了隊列中的任務實例,那麽heapIndex的優化就不存在了,因而這裏強烈建議是盡量不要重寫該方法,或者重寫時也還是復用ScheduledFutureTask類。
2. 源碼詳解
2.1 主要屬性
ScheduledThreadPoolExecutor主要有四個屬性,分別如下:
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
private volatile boolean removeOnCancel = false;
private static final AtomicLong sequencer = new AtomicLong();
- continueExistingPeriodicTasksAfterShutdown:用於標識當前Executor對象shutdown時,是否繼續執行已經存在於任務隊列中的定時任務(調用scheduleAtFixedRate()方法生成的任務);
- executeExistingDelayedTasksAfterShutdown:用於標識當前Executor對象shutdown時,是否繼續執行已經存在於任務隊列中的定時任務(調用scheduleWithFixedDelay()方法生成的任務);
- removeOnCancel:用於標識如果當前任務已經取消了,是否將其從任務隊列中真正的移除,而不只是標識其為刪除狀態;
- sequencer:其為一個AtomicLong類型的變量,該變量記錄了當前任務被創建時是第幾個任務的一個序號,這個序號的主要用於確認當兩個任務開始執行時間相同時具體哪個任務先執行,比如兩個任務的開始執行時間都為1515847881158,那麽序號小的任務將先執行。
2.2 ScheduledFutureTask
在ScheduledThreadPoolExecutor中,主要使用ScheduledFutureTask封裝需要執行的任務,該類的主要聲明如下:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber; // 記錄當前實例的序列號
private long time; // 記錄當前任務下次開始執行的時間
// 記錄當前任務執行時間間隔,等於0則表示當前任務只執行一次,大於0表示當前任務為fixedRate類型的任務,
// 小於0則表示其為fixedDelay類型的任務
private final long period;
RunnableScheduledFuture<V> outerTask = this; // 記錄需要周期性執行的任務的實例
int heapIndex; // 記錄當前任務在隊列數組中位置的下標
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement(); // 序號在創建任務實例時指定,且後續不會變化
}
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// 各個任務在隊列中的存儲方式是一個基於時間和序號進行比較的優先隊列,當前方法定義了優先隊列中兩個
// 任務執行的先後順序。這裏先對兩個任務開始執行時間進行比較,時間較小者優先執行,若開始時間相同,
// 則比較兩個任務的序號,序號小的任務先執行
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public boolean isPeriodic() { // 判斷是否為周期性任務
return period != 0;
}
// 當前任務執行之後,會判斷當前任務是否為周期性任務,如果為周期性任務,那麽就調用當前方法計算
// 當前任務下次開始執行的時間。這裏如果當前任務是fixedRate類型的任務(p > 0),那麽下次執行時間
// 就是此次執行的開始時間加上時間間隔,如果當前任務是fixedDelay類型的任務(p < 0),那麽下次執行
// 時間就是當前時間(triggerTime()方法會獲取系統當前時間)加上任務執行時間間隔。可以看到,定頻率
// 和定延遲的任務的執行時間區別就在當前方法中進行了指定,因為調用當前方法時任務已經執行完成了,
// 因而triggerTime()方法中獲取的時間就是任務執行完成之後的時間點
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
// 取消當前任務的執行,super.cancel(boolean)方法也即FutureTask.cancel(boolean)方法。該方法傳入
// true表示如果當前任務正在執行,那麽立即終止其執行;傳入false表示如果當前方法正在執行,那麽等待其
// 執行完成之後再取消當前任務。
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
// 判斷是否設置了取消後移除隊列中當前任務,是則移除當前任務
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic(); // 判斷是否為周期性任務
if (!canRunInCurrentRunState(periodic)) // 判斷是否能夠在當前狀態下執行該任務
cancel(false);
else if (!periodic) // 如果能執行當前任務,但是任務不是周期性的,那麽就立即執行該任務一次
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { // 是周期性任務,則立即執行當前任務並且重置
setNextRunTime(); // 在當前任務執行完成後調用該方法計算當前任務下次執行的時間
reExecutePeriodic(outerTask); // 將當前任務放入任務隊列中以便下次執行
}
}
}
在ScheduledFutureTask中,主要有三個點需要強調:
- 對於run()方法的第一個分支,canRunInCurrentRunState()方法的聲明如下所示,可以看到,該方法是用於判斷當前任務如果為周期性任務,那麽其是否允許在shutdown狀態下繼續執行已經存在的周期性任務,是則表示當前狀態下是可以執行當前任務的,這裏isRunningOrShutdown()方法繼承自ThreadPoolExecutor;
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
- 在run()方法的最後一個if分支中,其首先會執行當前任務,在執行完成時才會調用setNextRunTime()方法設置下次任務執行時間,也就是說對於fixedRate和fixedDelay類型的任務都是在這個時間點才設置的,因而雖然fixedRate類型的任務,即使該任務下次執行時間比當前時間要早,其也只會在當前任務執行完成後立即執行,而不會與當前任務還未執行完時就執行;對於fixedDelay任務則不會存在該問題,因為其是以任務完成後的時間點為基礎計算下次執行的時間點;
- 對於run()方法的最後一個分支中的reExecutePeriodic()方法,其會將當前任務加入到任務隊列中,並且調用父類的ensurePrestart()方法確保有可用的線程來執行當前任務,如下是該方法的具體實現:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) { // 判斷當前任務是否可以繼續執行
super.getQueue().add(task); // 將當前任務加入到任務隊列中
if (!canRunInCurrentRunState(true) && remove(task)) // 雙檢查法判斷任務在加入過程中是否取消了
task.cancel(false);
else
ensurePrestart(); // 初始化核心線程等確保任務可以被執行
}
}
從ScheduledFutureTask的實現總結來看,當每創建一個該類實例時,會初始化該類的一些主要屬性,如下次開始執行的時間和執行的周期。當某個線程調用該任務,即執行該任務的run()方法時,如果該任務不為周期性任務,那麽執行該任務之後就不會有其余的動作,如果該任務為周期性任務,那麽在將當前任務執行完畢之後,還會重置當前任務的狀態,並且計算下次執行當前任務的時間,然後將其放入隊列中以便下次執行。
2.3 DelayedWorkQueue
DelayedWorkQueue的實現與DelayQueue以及PriorityQueue的實現基本相似,形式都為一個優先隊列,並且底層是使用堆結構來實現優先隊列的功能,在數據存儲方式上,其使用的是數組來實現。這裏DelayedWorkQueue與DelayQueue以及PriorityQueue不同的點在於DelayedWorkQueue中主要存儲ScheduledFutureTask類型的任務,該任務中有一個heapIndex屬性保存了當前任務在當前隊列數組中的位置下標,其主要提升的是對隊列的諸如contains()和remove()等需要定位當前任務位置的方法的效率,時間復雜度可以從O(N)提升到O(logN)。如下是DelayedWorkQueue的實現代碼(這裏只列出了該類的主要屬性和與實現ScheduledThreadPoolExecutor功能相關的方法,關於如何使用數組實現優先隊列請讀者查閱相關文檔):
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16; // 數組初始化大小
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock(); // 對添加和刪除元素所使用的鎖
private int size = 0; // 當前隊列中有效任務的個數
private Thread leader = null; // 執行隊列頭部任務的線程
private final Condition available = lock.newCondition(); // 除leader線程外其余線程的等待隊列
// 在對任務進行移動時,判斷其是否為ScheduledFutureTask實例,如果是則維護其heapIndex屬性
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
private void siftUp(int k, RunnableScheduledFuture<?> key) {/* 省略 */}
private void siftDown(int k, RunnableScheduledFuture<?> key) {/* 省略 */}
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) { // 如果為ScheduledFutureTask則可返回其heapIndex屬性
int i = ((ScheduledFutureTask) x).heapIndex;
if (i >= 0 && i < size && queue[i] == x)
return i;
} else { // 如果不為ScheduledFutureTask實例,則需要遍歷隊列查詢當前元素的位置
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow(); // 隊列容量不足,對其進行擴容
size = i + 1;
if (i == 0) { // 如果其為隊列第一個元素,則將其放入隊列頭部
queue[0] = e;
setIndex(e, 0);
} else { //如果不為第一個元素,則通過堆的上移元素操作移動當前元素至合適的位置
siftUp(i, e);
}
if (queue[0] == e) { // 如果被更新的是隊列頭部元素,則更新記錄的執行頭部任務的線程
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
// 完成從隊列拉取元素操作,並且將其從隊列中移除
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null; // 將隊列最尾部的元素置空
if (s != 0) // 將最後一個元素放入第一個位置,並且將其下推至合適的位置
siftDown(0, x); // 這裏idx置為0是因為當前方法的入參f都為隊列的第一個元素
setIndex(f, -1);
return f;
}
// 嘗試從隊列(堆)中獲取元素,如果沒有元素或者元素的延遲時間還未到則返回空
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
// 在此處代碼控制了當從堆頂拉取元素時,如果元素的延遲時間還未達到,則不返回當前元素
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first); // 返回堆頂元素
} finally {
lock.unlock();
}
}
// 通過無限for循環獲取堆頂的元素,這裏take()方法會阻塞當前線程,直至獲取到了可執行的任務。
// 可以看到,在第一次for循環中,如果堆頂不存在任務,則其會加入阻塞隊列中,如果存在任務,但是
// 其延遲時間還未到,那麽當前線程會等待該延遲時間長的時間,然後查看任務是否可用,當獲取到任務
// 之後,其會將其從隊列中移除,並且喚醒等待隊列中其余等待的線程執行下一個任務
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await(); // 堆內沒有元素,當前線程進入等待隊列中
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 堆頂元素延遲時間小於0,可立即獲取任務
return finishPoll(first);
first = null;
if (leader != null)
available.await(); // 已經有線程在等待堆頂元素,則當前線程進入等待隊列中
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 當前線程等待一定時長後獲取任務並執行
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal(); // 當前線程獲取完任務之後喚醒等待隊列中的下一個線程執行下一個任務
lock.unlock();
}
}
}
從DelayedWorkQueue的take()和poll()方法可以看出來,對於隊列中任務的等待時間的限制主要是在這兩個方法中實現的,如果任務的等待時間還未到,那麽該方法就會阻塞線程池中的線程,直至任務可以執行。
2.4 scheduleAtFixedRate()和scheduleWithFixedDelay()方法
前面我們對ScheduledThreadPoolExecutor的主要屬性和主要內部類都進行了詳細的講解,基本上已經可以看出其是如何實現定時執行任務的功能的,接下來我們主要對客戶端可以調用的主要方法進行簡要介紹,這裏scheduleAtFixedRate()和scheduleWithFixedDelay()方法的實現基本是一致的,兩個方法最細微的區別在於ScheduledFutureTask的setNextRunTime()方法的實現,該方法的實現前面已經進行了講解,我們這裏則以scheduleAtFixedRate()方法的實現為例對該方法進行講解。如下是該方法的具體實現:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft = // 封裝客戶端的任務實例
new ScheduledFutureTask<Void>(command, null,
triggerTime(initialDelay, unit),unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft); // 對客戶端任務實例進行裝飾
sft.outerTask = t; // 初始化周期任務屬性outerTask
delayedExecute(t); // 執行該任務
return t;
}
從上述代碼可以看出來,scheduleAtFixedRate()首先對客戶端任務實例進行了封裝,裝飾,並且初始化了封裝後的任務實例的outerTask屬性,最後調用delayedExecute()方法執行任務。如下是delayedExecute()方法的實現:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task); // 添加當前任務到任務隊列中
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
task.cancel(false); // 雙檢查法再次判斷當前線程池是否處於可用狀態,不是則移除當前任務
else
ensurePrestart(); // 若線程池沒有初始化,則進行一些初始化工作
}
}
上述方法為主要的執行任務的方法,該方法首先會將任務加入到任務隊列中,如果線程池已經初始化過,那麽該任務就會有等待的線程執行該任務。在加入到任務隊列之後通過雙檢查法檢查線程池是否已經shutdown了,如果是則將該任務從任務隊列中移除。如果當前線程池沒有shutdown,就調用繼承自ThreadPoolExecutor的ensurePrestart()方法,該方法會對線程池進行一些初始化工作,如初始化核心線程,然後各個線程會調用上述等待隊列的take()方法獲取任務執行。
ScheduledThreadPoolExecutor詳解