1. 程式人生 > >併發系列(一)——執行緒池原始碼(ThreadPoolExecutor類)簡析

併發系列(一)——執行緒池原始碼(ThreadPoolExecutor類)簡析

前言

  本文主要是結合原始碼去執行緒池執行任務的過程,基於JDK 11,整個過程基本與JDK 8相同。

  個人水平有限,文中若有表達有誤的,歡迎大夥留言指出,謝謝了!

一、執行緒池簡介

  1.1 使用執行緒池的優點

    1)通過複用已建立的執行緒,降低資源的消耗(執行緒的建立/銷燬是要消耗資源的)、提高響應速度;

    2)管理執行緒的個數,執行緒的個數在初始化執行緒池的時候指定;

    3)統一管理執行緒,比如停止,stop()方法;

  1.2 執行緒池執行任務過程

    執行緒池執行任務的過程如下圖所示,主要分為以下4步,其中引數的含義會在後面詳細講解:

    1)判斷工作的執行緒是否小於核心執行緒資料(workerCountOf(c) < corePoolSize),若小於則會新建一個執行緒去執行任務,這一步僅僅的是根據執行緒個數決定;

    2)若核心執行緒池滿了,就會判斷執行緒池的狀態,若是running狀態,則嘗試加入任務佇列,若加入成功後還會做一些事情,後面詳細說;

    3)若任務佇列滿了,則加入失敗,此時會判斷整個執行緒池執行緒是否滿,若沒有則建立非核心執行緒執行任務;

    4)若執行緒池滿了,則根據拒絕測試處理無法執行的任務;

    整體過程如下圖:

二、ThreadPoolExecutor類解析

  2.1 ThreadPoolExecutor的建構函式

    ThreadPoolExecutor類一共提供了4個建構函式,涉及5~7個引數,下面就5個必備引數的建構函式進行說明:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    1)corePoolSize :初始化核心執行緒池中執行緒個數的大小;

    2)maxmumPoolSize:執行緒池中執行緒大小;

    3)keepAliveTime:非核心執行緒的超時時長;

      非核心執行緒空閒時常大於該值就會被終止。

    4)unit :keepAliveTime的單位,型別可以參見TimeUnit類;

    5)BlockingQueue workQueue:阻塞佇列,維護等待執行的任務;

  2.2  私有類Worker

    在ThreadPoolExecutor類中有兩個集合型別比較重要,一個是用於放置等待任務的workQueue,其型別是阻塞對列;一個是用於用於存放工作執行緒的works,其是Set型別,其中存放的型別是Worker。

    進一步簡化執行緒池執行過程,可以理解為works中的工作執行緒不停的去阻塞對列中取任務,執行結束,執行緒重新加入大works中。

    為此,有必要簡單瞭解一下Work型別的組成。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        //工作執行緒,由執行緒的工廠類初始化
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        //不可重入的鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        .......
    }

    Worker類繼承於佇列同步器(AbstractQueueSynchronizer),佇列同步器是採取鎖或其他同步元件的基礎框架,其主要結構是自旋獲取鎖的同步佇列和等待喚醒的等待佇列,其方法因此可以分為兩類:對state改變的方法 和 入、出佇列的方法,即獲取獲取鎖的資格的變化(可能描述的不準確)。關於佇列同步器後續部落格會詳細分析,此處不展開討論。

    Work類中通過CAS設定狀態失敗後直接返回false,而不是判斷當前執行緒是否已獲取鎖來實現不可重入的鎖,原始碼註釋中解釋這樣做的原因是因為避免work tash重新獲取到控制執行緒池全域性的方法,如setCorePoolSize。

  2.3  拒絕策略類

    ThreadPoolExecutor的拒絕策略類是以私有類的方式實現的,有四種策略:

    1)AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常(預設拒絕處理策略)。

      2)DiscardPolicy:拋棄新來的任務,但是不丟擲異常。

      3)DiscardOldestPolicy:拋棄等待佇列頭部(最舊的)的任務,然後重新嘗試執行程式(失敗則會重複此過程)。

      4)CallerRunsPolicy:由呼叫執行緒處理該任務。

    其程式碼相對簡單,可以參考原始碼。

三、任務執行過程分析

  3.1 execute(Runnable)方法

    execute(Runnable)方法的整體過程如上文1.2所述,其實現方式如下:

public void execute(Runnable command) {
        //執行的任務為空,直接丟擲異常
        if (command == null)
            throw new NullPointerException();
        //ctl是ThreadPoolExecutor中很關鍵的一個AtomicInteger,主執行緒池的控制狀態
        int c = ctl.get();
        //1、判斷是否小於核心執行緒池的大小,若是則直接嘗試新建一個work執行緒
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2、大於核心執行緒池的大小或新建work失敗(如建立thread失敗),會先判斷執行緒池是否是running狀態,若是則加入阻塞對列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新驗證執行緒池是否為running,若否,則嘗試從對列中刪除,成功後執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若執行緒池的狀態為shutdown則,嘗試去執行完阻塞對列中的任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3、新建非核心執行緒去執行任務,若失敗,則採取拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

  3.2 addWorker(Runnable,boole)方法

    execute(Runnable)方法中,新建(非)核心執行緒執行任務主要是通過addWorker方法實現的,其執行過程如下:

private boolean addWorker(Runnable firstTask, boolean core) {
        //此處反覆檢查執行緒池的狀態以及工作執行緒是否超過給定的值
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
            //核心和非核心執行緒的區別
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            //通過工廠方法初始化,可能失敗,即可能為null
            final Thread t = w.thread;
            if (t != null) {
            //獲取全域性鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    //執行緒池處於running狀態
                    //或shutdown狀態但無需要執行的task,個人理解為用於去阻塞佇列中取任務執行
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //執行任務,這裡會執行thread的firstTask獲取阻塞對列中取任務
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //開始失敗,則會從workers中刪除新建的work,work數量減1,嘗試關閉執行緒池,這些過程會獲取全域性鎖
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  3.3  runWorker(this) 方法

     在3.2 中當新建的worker執行緒加入在workers中成功後,就會啟動對應任務,其呼叫的是Worker類中的run()方法,即呼叫runWorker(this)方法,其過程如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //while()迴圈中,前者是新建執行緒執行firstTask,對應執行緒個數小於核心執行緒和阻塞佇列滿的情況,
        //getTask()則是從阻塞對列中取任務執行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //僅執行緒池狀態為stop時,執行緒響應中斷,這裡也就解釋了呼叫shutdown時,正在工作的執行緒會繼續工作
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                    //執行任務
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    //完成的個數+1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //處理後續工作
            processWorkerExit(w, completedAbruptly);
        }
    }

   3.4 processWorkerExit(Worker,boole)方法

    當任務執行結果後,在滿足一定條件下會新增一個worker執行緒,程式碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //對工作執行緒的增減需要加全域性鎖
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //嘗試終止執行緒池
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
        //執行緒不是中斷,會維持最小的個數
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //執行完任務後,執行緒重新加入workers中
            addWorker(null, false);
        }
    }

  至此,執行緒池執行任務的過程分析結束,其他方法的實現過程可以參考原始碼。

 

Ref:

[1]http://concurrent.redspider.group/article/03/12.html

[2]《Java併發程式設計的藝術》

&n