併發:ScheduledThreadPoolExecutor詳解。
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。他主要用來給定的延遲之後執行任務,或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺執行緒,而ScheduledThreadPoolExecutor可以在建構函式中指定多個對應的後臺執行緒數。
ScheduledThreadPoolExecutor的執行機制
ScheduledThreadPoolExecutor的執行示意圖(本文基於JDK 6)如下所示。
DelayQueue是一個無界佇列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什麼意義(設定maximumPoolSize的大小沒有什麼效果)。
ScheduledThreadPoolExecutor的執行主要分為兩大部分。
-
當呼叫ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue新增一個實現了RunnableScheduledFuture介面的ScheduledFutureTask。
-
執行緒池中的執行緒從DelayQueue中獲取SheduledFutureTask,然後執行任務。
ScheduledThreadPoolExecutor為了實現週期性的執行任務,對ThreadPoolExecutor做了如下的修改。
-
使用DelayQueue作為任務佇列。
-
獲取任務的方式不同。
-
執行週期任務後,增加了額外的處理。
ScheduledThreadPoolExecutor的實現
ScheduledThreadPoolExecutor會把待排程的任務(ScheduledFutureTask)放到一個DelayQueue中。
ScheduledFutureTask主要包含3個成員變數,如下。
- long型成員變數time,表示這個任務將要被執行的具體時間。
- long型成員變數sequenceNumber,表示這個任務被新增到ScheduledThreadPoolExecutor中的序號。
- long型成員變數period,表示任務執行的間隔週期。
DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對佇列中的ScheduledFutureTask進行排序。排序時,time小的排在前面(時間早的任務將被先執行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就是說,如果兩個任務的執行時間相同,那麼先提交的任務將被先執行)。
首先,讓我們看看ScheduledThreadPoolExecutor中的執行緒執行週期任務的過程。下圖是ScheduledThreadPoolExecutor中的執行緒1執行某個週期任務的4個步驟。
下面是對這4個步驟的說明。
- 執行緒1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間。
- 執行緒1執行這個ScheduledFutureTask。
- 執行緒1修改ScheduledFutureTask的time變數為下次將要被執行的時間。
- 執行緒1把這個修改time之後的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
接下來,讓我們看看上面步驟1獲取任務的過程。下面是DelayQueue.take()方法的原始碼實現。
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 1
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else { // 2.1
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0)
long t1 = available.awaitNanos(delay); // 2.2
else {
E x = q.poll(); // 2.3.1
assert x != null;
if(q.size() 1= 0)
available.singalAll(); // 2.3.2
return x;
}
}
}
} finally {
lock.unlock(); // 3
}
}
下圖是DelayQueue.take()的執行示意圖。
如上圖所示,獲取任務分為3大步驟。
1、獲取Lock。
2、獲取週期任務。
- 如果PriorityQueue為空,當前執行緒到Condition中等待,否則執行下面的2.2。
- 如果PriorityQueue的頭元素的time時間比當前時間大,到Condition中等待到time時間;否則執行下面的2.3。
- 獲取PriorityQueue的頭元素(2.3.1);如果PriorityQueue不為空,則喚醒在Condition中等待的所有執行緒(2.3.2)。
3、釋放Lock。
ScheduledThreadPoolExecutor在一個迴圈中執行步驟2,直到執行緒從PriorityQueue獲取到一個元素之後(執行2.3.1之後),才會退出無限迴圈(結束步驟2)。
最後,讓我們看看ScheduledThreadPoolExecutor中的執行緒執行任務的步驟4,把ScheduledFuture放入DelayQueue中的過程。下面是DelayQueue.add()的原始碼實現。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); // 1
try {
E first = q.peek();
q.offer(e); // 2.1
if (first == null || e.compareTo(first) < 0) {
available.signalAll(); // 2.2
}
return true;
} finally {
lock.unlock(); // 3
}
}
下圖是DelayQueue.add()的執行示意圖。
如上圖所示,新增任務分為3大步驟。
1、獲取Lock。
2、新增任務。
- 向PriorityQueue新增任務。
- 如果在上面2.1中新增的任務是PriorityQueue的頭元素,喚醒在Condition中等待的所有執行緒。
3、釋放Lock。