1. 程式人生 > >Java併發程式設計筆記——J.U.C之executors框架:ScheduledThreadPoolExecutor

Java併發程式設計筆記——J.U.C之executors框架:ScheduledThreadPoolExecutor

一、ScheduledThreadPoolExecutor簡介

executors框架設計理念 一節中,我們曾經提到過一種可對任務進行延遲/週期性排程的執行器(Executor),這類Executor一般實現了ScheduledExecutorService這個介面。ScheduledExecutorService在普通執行器介面(ExecutorService)的基礎上引入了Future模式,使得可以限時或週期性地排程任務。

ScheduledThreadPoolExecutor的類繼承關係如下圖,該圖中除了本節要講解的ScheduledThreadPoolExecutor外,其它部分已經在前2節詳細介紹過了:

從上圖中可以看到,ScheduledThreadPoolExecutor其實是繼承了ThreadPoolExecutor這個普通執行緒池,我們知道ThreadPoolExecutor中提交的任務都是實現了Runnable介面,但是ScheduledThreadPoolExecutor比較特殊,由於要滿足任務的延遲/週期排程功能,它會對所有的Runnable任務都進行包裝,包裝成一個RunnableScheduledFuture任務。

RunnableScheduledFuture是Future模式中的一個介面,關於Future模式,我們後續會專門章節講解,這裡只要知道RunnableScheduledFuture的作用就是可以非同步地執行【延時/週期任務】。

另外,我們知道在ThreadPoolExecutor中,需要指定一個阻塞佇列作為任務佇列。ScheduledThreadPoolExecutor中也一樣,不過特殊的是,ScheduledThreadPoolExecutor中的任務佇列是一種特殊的延時佇列(DelayQueue)。

我們曾經在juc-collections框架中,分析過該種阻塞佇列,DelayQueue底層基於優先佇列(PriorityQueue)實現,是一種“堆”結構,通過該種阻塞佇列可以實現任務的延遲到期執行(即每次從佇列獲取的任務都是最先到期的任務)。

ScheduledThreadPoolExecutor在內部定義了DelayQueue的變種——DelayedWorkQueue

,它和DelayQueue類似,只不過要求所有入隊元素必須實現RunnableScheduledFuture介面。

二、ScheduledThreadPoolExecutor基本原理

構造執行緒池

我們先來看下ScheduledThreadPoolExecutor的構造,其實在executors框架概述中講Executors時已經接觸過了,Executors使用newScheduledThreadPool工廠方法建立ScheduledThreadPoolExecutor:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

我們來看下ScheduledThreadPoolExecutor的構造器,內部其實都是呼叫了父類ThreadPoolExecutor的構造器,這裡最需要注意的就是任務佇列的選擇——DelayedWorkQueue,我們後面會詳細介紹它的實現原理。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
}

執行緒池的排程

ScheduledThreadPoolExecutor的核心排程方法是schedulescheduleAtFixedRatescheduleWithFixedDelay,我們通過schedule方法來看下整個排程流程:

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,
            triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

上述的decorateTask方法把Runnable任務包裝成ScheduledFutureTask,使用者可以根據自己的需要覆寫該方法:

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}
注意: ScheduledFutureTask是RunnableScheduledFuture介面的實現類,任務通過 period欄位來表示任務型別
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
 
    /**
     * 任務序號, 自增唯一
     */
    private final long sequenceNumber;
 
    /**
     * 首次執行的時間點
     */
    private long time;
 
    /**
     * 0: 非週期任務
     * >0: fixed-rate任務
     * <0: fixed-delay任務
     */
    private final long period;
 
    /**
     * 在堆中的索引
     */
    int heapIndex;
 
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
    
    // ...
}
ScheduledThreadPoolExecutor中的任務佇列—— DelayedWorkQueue,儲存的元素就是ScheduledFutureTask。DelayedWorkQueue是一種 堆結構,time最小的任務會排在堆頂(表示最早過期),每次出隊都是取堆頂元素,這樣最快到期的任務就會被先執行。如果兩個ScheduledFutureTask的time相同,就比較它們的序號——sequenceNumber,序號小的代表先被提交,所以就會先執行。

schedule的核心是其中的delayedExecute方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())   // 執行緒池已關閉
        reject(task);   // 任務拒絕策略
    else {
        super.getQueue().add(task);                 // 將任務入隊
 
        // 如果執行緒池已關閉且該任務是非週期任務, 則將其從佇列移除
        if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
            task.cancel(false);  // 取消任務
        else
            ensurePrestart();   // 新增一個工作執行緒
    }
}

通過delayedExecute可以看出,ScheduledThreadPoolExecutor的整個任務排程流程大致如下圖:

我們來分析這個過程:

  1. 首先,任務被提交到執行緒池後,會判斷執行緒池的狀態,如果不是RUNNING狀態會執行拒絕策略。
  2. 然後,將任務新增到阻塞佇列中。(注意,由於DelayedWorkQueue是無界佇列,所以一定會add成功)
  3. 然後,會建立一個工作執行緒,加入到核心執行緒池或者非核心執行緒池:

    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

    通過ensurePrestart可以看到,如果核心執行緒池未滿,則新建的工作執行緒會被放到核心執行緒池中。如果核心執行緒池已經滿了,ScheduledThreadPoolExecutor不會像ThreadPoolExecutor那樣再去建立歸屬於非核心執行緒池的工作執行緒,而是直接返回。也就是說,在ScheduledThreadPoolExecutor中,一旦核心執行緒池滿了,就不會再去建立工作執行緒。

這裡思考一點,什麼時候會執行else if (wc == 0)建立一個歸屬於非核心執行緒池的工作執行緒?
答案是,當通過setCorePoolSize方法設定核心執行緒池大小為0時,這裡必須要保證任務能夠被執行,所以會建立一個工作執行緒,放到非核心執行緒池中。

最後,執行緒池中的工作執行緒會去任務佇列獲取任務並執行,當任務被執行完成後,如果該任務是週期任務,則會重置time欄位,並重新插入佇列中,等待下次執行。這裡注意從佇列中獲取元素的方法:

  • 對於核心執行緒池中的工作執行緒來說,如果沒有超時設定(allowCoreThreadTimeOut == false),則會使用阻塞方法take獲取任務(因為沒有超時限制,所以會一直等待直到佇列中有任務);如果設定了超時,則會使用poll方法(方法入參需要超時時間),超時還沒拿到任務的話,該工作執行緒就會被回收。
  • 對於非工作執行緒來說,都是呼叫poll獲取佇列元素,超時取不到任務就會被回收。



上述就是ScheduledThreadPoolExecutor的核心排程流程,通過我們的分析可以看出,相比ThreadPoolExecutor,ScheduledThreadPoolExecutor主要有以下幾點不同:

  1. 總體的排程控制流程略有區別;
  2. 任務的執行方式有所區別;
  3. 任務佇列的選擇不同。

最後,我們來看下ScheduledThreadPoolExecutor中的延時佇列——DelayedWorkQueue

延時佇列

DelayedWorkQueue,該佇列和已經介紹過的DelayQueue區別不大,只不過佇列元素是RunnableScheduledFuture:

static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private int size = 0;
 
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition available = lock.newCondition();
 
    private Thread leader = null;
 
    // ...
}

DelayedWorkQueue是一個無界佇列,在佇列元素滿了以後會自動擴容,它並沒有像DelayQueue那樣,將佇列操作委託給PriorityQueue,而是自己重新實現了一遍堆的核心操作——上浮、下沉。我這裡不再贅述這些堆操作,讀者可以參考PriorityBlockingQueue自行閱讀原始碼。

我們關鍵來看下addtakepoll這三個佇列方法,因為ScheduledThreadPoolExecutor的核心排程流程中使用到了這三個方法:

public boolean add(Runnable e) {
    return offer(e);
}
 
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
    return offer(e);
}

add、offer內部都呼叫了下面這個方法:

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>) x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;           // 佇列已滿, 擴容
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);       // 堆上浮操作
        }
        
        if (queue[0] == e) {    // 當前元素是首個元素
            leader = null;
            available.signal(); // 喚醒一個等待執行緒
        }   
    } finally {
        lock.unlock();
    }
    return true;
}

take方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (; ; ) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)          // 佇列為空
                available.await();      // 等待元素入隊
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)         // 元素已到期
                    return finishPoll(first);
 
                // 執行到此處, 說明隊首元素還未到期
                first = null;
                if (leader != null)
                    available.await();
                else {
                    // 當前執行緒成功leader執行緒
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

注意:上述leader表示一個等待獲取隊首元素的出隊執行緒,這是一種稱為“Leader-Follower pattern”的多執行緒設計模式(讀者可以參考DelayQueue中的講解)。

每次出隊元素時,如果佇列為空或者隊首元素還未到期,執行緒就會在condition條件佇列等待。一般的思路是無限等待,直到出現一個入隊執行緒,入隊元素後將一個出隊執行緒喚醒。
為了提升效能,當佇列非空時,用 leader儲存第一個到來並嘗試出隊的執行緒,並設定它的等待時間為隊首元素的剩餘期限,這樣當元素過期後,執行緒也就自己喚醒了,不需要入隊執行緒喚醒。這樣做的好處就是提升一些效能。

三、總結

本節介紹了ScheduledThreadPoolExecutor,它是對普通執行緒池ThreadPoolExecutor的擴充套件,增加了延時排程、週期排程任務的功能。概括下ScheduledThreadPoolExecutor的主要特點:

  1. 對Runnable任務進行包裝,封裝成ScheduledFutureTask,該類任務支援任務的週期執行、延遲執行;
  2. 採用DelayedWorkQueue作為任務佇列。該佇列是無界佇列,所以任務一定能新增成功,但是當工作執行緒嘗試從佇列取任務執行時,只有最先到期的任務會出隊,如果沒有任務或者隊首任務未到期,則工作執行緒會阻塞;
  3. ScheduledThreadPoolExecutor的任務排程流程與ThreadPoolExecutor略有區別,最大的區別就是,先往佇列新增任務,然後建立工作執行緒執行任務。

另外,maximumPoolSize這個引數對ScheduledThreadPoolExecutor其實並沒有作用,因為除非把corePoolSize設定為0,這種情況下ScheduledThreadPoolExecutor只會建立一個屬於非核心執行緒池的工作執行緒;否則,ScheduledThreadPoolExecutor只會新建歸屬於核心執行緒池的工作執行緒,一旦核心執行緒池滿了,就不再新