1. 程式人生 > >併發:ScheduledThreadPoolExecutor詳解。

併發:ScheduledThreadPoolExecutor詳解。

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。他主要用來給定的延遲之後執行任務,或者定期執行任務。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強大、更靈活。Timer對應的是單個後臺執行緒,而ScheduledThreadPoolExecutor可以在建構函式中指定多個對應的後臺執行緒數。

ScheduledThreadPoolExecutor的執行機制

ScheduledThreadPoolExecutor的執行示意圖(本文基於JDK 6)如下所示。

DelayQueue是一個無界佇列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什麼意義(設定maximumPoolSize的大小沒有什麼效果)。

ScheduledThreadPoolExecutor的執行主要分為兩大部分。

  • 當呼叫ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue新增一個實現了RunnableScheduledFuture介面的ScheduledFutureTask。

  • 執行緒池中的執行緒從DelayQueue中獲取SheduledFutureTask,然後執行任務。

ScheduledThreadPoolExecutor為了實現週期性的執行任務,對ThreadPoolExecutor做了如下的修改。

  • 使用DelayQueue作為任務佇列。

  • 獲取任務的方式不同。

  • 執行週期任務後,增加了額外的處理。

ScheduledThreadPoolExecutor的實現

ScheduledThreadPoolExecutor會把待排程的任務(ScheduledFutureTask)放到一個DelayQueue中。

ScheduledFutureTask主要包含3個成員變數,如下。

  • long型成員變數time,表示這個任務將要被執行的具體時間。
  • long型成員變數sequenceNumber,表示這個任務被新增到ScheduledThreadPoolExecutor中的序號。
  • long型成員變數period,表示任務執行的間隔週期。

DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對佇列中的ScheduledFutureTask進行排序。排序時,time小的排在前面(時間早的任務將被先執行)。如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面(也就是說,如果兩個任務的執行時間相同,那麼先提交的任務將被先執行)。

首先,讓我們看看ScheduledThreadPoolExecutor中的執行緒執行週期任務的過程。下圖是ScheduledThreadPoolExecutor中的執行緒1執行某個週期任務的4個步驟。

下面是對這4個步驟的說明。

  1. 執行緒1從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間。
  2. 執行緒1執行這個ScheduledFutureTask。
  3. 執行緒1修改ScheduledFutureTask的time變數為下次將要被執行的時間。
  4. 執行緒1把這個修改time之後的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。

接下來,讓我們看看上面步驟1獲取任務的過程。下面是DelayQueue.take()方法的原始碼實現。

        public RunnableScheduledFuture take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();    // 1
            try {
                for (;;) {
                    E first = q.peek();
                    if (first == null)
                        available.await();
                    else {    // 2.1
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay > 0)
                            long t1 = available.awaitNanos(delay);    // 2.2
                        else {
                            E x = q.poll();                // 2.3.1
                            assert x != null;
                            if(q.size() 1= 0)
                                available.singalAll();    // 2.3.2
                            return x;
                        }
                    }
                }
            } finally {    
                lock.unlock();                        // 3
            }
        }

下圖是DelayQueue.take()的執行示意圖。

如上圖所示,獲取任務分為3大步驟。
1、獲取Lock。
2、獲取週期任務。

  • 如果PriorityQueue為空,當前執行緒到Condition中等待,否則執行下面的2.2。
  • 如果PriorityQueue的頭元素的time時間比當前時間大,到Condition中等待到time時間;否則執行下面的2.3。
  • 獲取PriorityQueue的頭元素(2.3.1);如果PriorityQueue不為空,則喚醒在Condition中等待的所有執行緒(2.3.2)。

3、釋放Lock。

ScheduledThreadPoolExecutor在一個迴圈中執行步驟2,直到執行緒從PriorityQueue獲取到一個元素之後(執行2.3.1之後),才會退出無限迴圈(結束步驟2)。

最後,讓我們看看ScheduledThreadPoolExecutor中的執行緒執行任務的步驟4,把ScheduledFuture放入DelayQueue中的過程。下面是DelayQueue.add()的原始碼實現。

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();                            // 1
        try {               
            E first = q.peek();     
            q.offer(e);                         // 2.1
            if (first == null || e.compareTo(first) < 0) { 
                available.signalAll();          // 2.2
            }
            return true;
        } finally {
            lock.unlock();                      // 3
        }
    }

下圖是DelayQueue.add()的執行示意圖。

如上圖所示,新增任務分為3大步驟。
1、獲取Lock。
2、新增任務。

  • 向PriorityQueue新增任務。
  • 如果在上面2.1中新增的任務是PriorityQueue的頭元素,喚醒在Condition中等待的所有執行緒。

3、釋放Lock。