1. 程式人生 > 實用技巧 >一文讓你領悟執行緒池的原理和機制設計—洞虛篇

一文讓你領悟執行緒池的原理和機制設計—洞虛篇

書接上文,一文加深你對Java執行緒池的瞭解與使用—築基篇,本文將從執行緒池內部的最最核心類 ThreadPoolExecutor 原始碼中的重要方法入手,也是本文分析的物件,從狀態/任務/執行緒這三個模組剖析執行緒池的機制,掌握背後的核心設計。

一、執行緒池如何管理自身的狀態/生命週期

ThreadPoolExecutor 類中,有以下的定義:

//Integer的範圍為[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用來輔助左移位運算
private static final int COUNT_BITS = Integer.SIZE - 3;
//(1 << 29) - 1=000011111111111111111111111111111,前三位是0,後29為1。
//常量值,被用以輔助與運算求出執行緒池執行狀態or執行緒池執行緒數量,見runStateOf與workerCountOf方法
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits 
//執行緒池狀態以常量值被儲存在高位中(前三位)
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 使用32位的變數ctl同時容納兩個重要概念的值,前3位儲存執行緒池自身狀態,後29位儲存執行緒數量
//runStateOf負責取出狀態值,workerCountOf負責取出執行緒數量值,ctlOf負責將兩個值合成到一個32位的值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

//自增型Integer,初始化時會將 RUNNING | 0 合成到一起
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

從以上原始碼可以看出:執行緒池的自身狀態,共有5種,通過常量值方式定義下來,執行緒池被啟動後,執行緒池狀態儲存在32位的自增型Integer變數 ctl 的高位(前3位),類內其他方法是通過runStateOf(ctl)方法位運算取出狀態常量值(前3位)。

ctl 剩餘29位的用途是什麼呢?——儲存執行緒池池內的活躍工作執行緒數量。執行緒池被啟動後,任務未被申請,執行緒當前數量為0,workerCountOf(ctl) 通過位運算取出後29位代表的工作執行緒數量值。

通過ctlOf(RUNNING, 0),將執行緒池狀態RUNNING與目前活躍執行緒數量0合成出一個32位的值賦值給ctl這個會自增的Integer

,用以儲存這兩個重要概念的值。

通過一個變數,巧妙地包含兩部分的資訊:執行緒池的執行狀態 (runState) 和執行緒池內有效執行緒的數量 (workerCount),兩者互不干擾,可避免在操作前做相關判斷時為了維護兩者的一致而佔用鎖和資源。

執行緒池狀態含義

執行緒池狀態 狀態含義
RUNNING 執行緒池被建立後的初始狀態,能接受新提交的任務,並且也能處理阻塞佇列中的任務。
SHUTDOWN 關閉狀態,不再接受新提交的任務,但仍可以繼續處理已進入阻塞佇列中的任務。
STOP 會中斷正在處理任務的執行緒,不能再接受新任務,也不繼續處理佇列中的任務,
TIDYING 所有的任務都已終止,workerCount(有效工作執行緒數)為0。
TERMINATED 執行緒池執行徹底終止

執行緒池如何切換狀態

在官方給出的說明中,可以清晰看出執行緒池各個狀態轉變的觸發條件:

RUNNING -> SHUTDOWN:On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP: On invocation of shutdownNow()
SHUTDOWN -> TIDYING: When both queue and pool are empty
STOP -> TIDYING:  When pool is empty
TIDYING -> TERMINATED: When the terminated() hook method has completed

執行緒池狀態的生命週期

二、執行緒池如何管理任務

任務的排程機制

不論是哪一種類的執行緒池,呼叫execute往執行緒新增任務後,最後都會進入ThreadPoolExecutor.execute(runnable)中,下面看一下這個方法有什麼名堂:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //獲取工作執行緒數量 與核心執行緒數對比
        if (workerCountOf(c) < corePoolSize) {
            //傳入true表示建立新執行緒時與核心執行緒數做比較,執行command
            if (addWorker(command, true)) 
                return;
            c = ctl.get();
        }
        //來到此處,說明 工作執行緒數 已經大於 核心執行緒數
        //短路原則,先判斷執行緒池狀態是否Running,處於Running則再判斷阻塞佇列是否可以儲存新任務
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
   	//短路原則,如果重檢查執行緒池狀態不在Running了,則嘗試remove(command)阻塞佇列移除此任務
            if (! isRunning(recheck) && remove(command))
                reject(command); //若佇列成功移除任務後,就拒絕掉此任務
            else if (workerCountOf(recheck) == 0)//執行狀態,儲存在阻塞佇列,但工作執行緒目前為0
       			//null表示單純建立工作執行緒,傳入false表示建立新執行緒時與最大執行緒數做比較        
                addWorker(null, false);
        }
        //阻塞佇列不可以儲存任務,嘗試增加工作執行緒執行command
        else if (!addWorker(command, false))
            reject(command); //若增加執行緒操作也失敗了-->拒絕掉任務
    }

在這短短的二十行程式碼裡,出現了多個 if else,說明任務排程還是有點複雜的,下面來逐步理清它。

此處的corePoolSize是在建構函式中被賦值this.corePoolSize = corePoolSize;,是一個固定下來的值。

execute方法是外界往執行緒池新增任務的入口,也是執行緒池內部首先接觸到外界任務的地方,它需要對任務的去向進行管理,對任務的管理有以下三個選項:

  • 緩衝到佇列中等待執行——workQueue.offer(command)
  • 建立新的執行緒直接執行——addWorker(command, false)
  • 拒絕掉該任務,執行執行緒池當前的拒絕策略——reject(command)

addWorker(Runnable firstTask, boolean core)方法內部邏輯得知,如果能建立新執行緒成功說明此時執行緒池的狀態是Running,或者是SHUTDOWN下任務佇列非空但是不可有新任務,然後當前執行緒數量需小於比較物件(傳入true,則與核心執行緒數做比較,傳入false則與最大執行緒數作比較)

根據原始碼,可以總結下以下的判斷條件,需要綜合阻塞佇列狀態,當前工作執行緒數量,核心執行緒數,最大執行緒數這些執行緒池核心屬性進行判斷。

執行緒池狀態 當前工作執行緒數 阻塞佇列是否已滿 佇列是否已加入任務 任務的排程
Running 少於核心執行緒數 / 建立新的工作執行緒,直接執行任務
Running 大於核心執行緒數 將任務加入到阻塞佇列,等待執行
非Running / / 佇列移除任務,移除成功後拒絕該任務執行拒絕策略
Running 0 / 建立新工作執行緒,但不執行任務
RUNNING 大於核心執行緒數且小於最大執行緒數 建立新的工作執行緒,直接執行任務
RUNNING 大於等於最大執行緒數 拒絕該任務,執行拒絕策略
非Running 大於等於最大執行緒數 / 拒絕該任務執行拒絕策略

任務進隊

execute()中已經操作任務進隊,需要同時滿足執行緒池執行狀態為Running,當前工作執行緒數大於核心執行緒數,阻塞佇列非已滿這些條件。

從建構函式可以看出不同種類的阻塞佇列都實現了 BlockingQueue<Runnable> 介面 。

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    //···
}

因為BlockingQueue介面提供的操作可以自定義,所以也有了能適應各種不同場景的阻塞佇列。不同的阻塞佇列對 offer 方法的重寫也是各不相同。

下面以 CachedThreadPoolSynchronousQueue重寫的offer方法為例:

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //transfer()傳入的e若為null則作為消費者,若非null則作為生產者
        //運用transferer.transfer達到put前需要take,不儲存任務的目的
        return transferer.transfer(e, true, 0) != null;
    }

任務出隊

任務的出隊是任務管理模組與執行緒管理模組的聯絡,簡單來說任務從阻塞佇列中被取出說明有工作執行緒需要執行任務了。

任務被執行會有兩種可能:第一種是任務直接由新建立的執行緒執行。另一種是執行緒從任務佇列中獲取任務然後執行,執行完任務的空閒執行緒會再次去從佇列中取出任務再執行。

那麼任務是什麼時候又是怎樣地會被取出呢?從 addWorker方法入手

private boolean addWorker(Runnable firstTask, boolean core) {
		//... 省略

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); //建立工作執行緒
            final Thread t = w.thread; //獲取工作執行緒持有的執行緒例項
            if (t != null) {
				//省略獲取鎖,加入阻塞佇列,再次判斷執行緒池執行狀態部分的程式碼
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
		...
        return workerStarted;
    }

接下來看下,這個持有執行緒例項的Worker是什麼名堂:已省略非討論部分的程式碼

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        /** 具體執行任務的執行緒例項, 若執行緒工廠提供不了可能為null */
        final Thread thread;
        /** 初始執行的任務.  可能為 null  */
        Runnable firstTask;
        /** 執行緒已完成任務計數器r */
        volatile long completedTasks;
        /**
         * 初始化給定的firstTask和從執行緒工廠獲取執行緒例項
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
		//省略以下關於鎖和取消的方法
    }

再來看下runWorker(this); 有什麼名堂:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
				//省略大量關於任務執行部分的程式碼
            }
       }
    }

可以看出, runWorker(worker) 方法,主要做的是,先執行worker自帶的firstTask任務,再不斷地執行getTask(),要從阻塞佇列中獲取任務來執行。也就是說:任務被執行的第一種可能就是指執行緒被建立時帶有firstTask任務,會先執行掉firstTask。

下面再來看下這個從阻塞佇列中返回任務的getTask()方法吧:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 檢查佇列是否為空,檢查當前執行緒池的狀態
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			//阻塞,迴圈重複地判斷,直到不滿足條件,才執行下方的返回操作
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r; //返回任務
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }

由程式碼可以看到,裡面的邏輯主要在死迴圈 for( ;