1. 程式人生 > 實用技巧 >執行緒池ThreadPoolExecutor——Worker原始碼解析

執行緒池ThreadPoolExecutor——Worker原始碼解析

執行緒池任務執行的主流程如下:

執行緒池呼叫execute提交任務
—>建立Worker(設定屬性thead、firstTask)
—>worker.thread.start()
—>實際上呼叫的是worker.run()
—>執行緒池的runWorker(worker)
—>worker.firstTask.run();

可以看到,在ThreadPoolExecutor中以Worker為單位對工作執行緒進行管理,下面分析一下Worker的執行原理:

1. Worker原始碼

private final class Worker
        extends AbstractQueuedSynchronizer
        
implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails.
*/ final Thread thread;//執行任務的執行緒 /** Initial task to run. Possibly null. */ Runnable firstTask;//要執行的任務 /** Per-thread task counter */ volatile long completedTasks;//完成任務的數量 /** * Creates with given first task and thread from ThreadFactory. *
@param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //呼叫執行緒工廠建立執行緒 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { //實際是呼叫 ThreadPoolExecutor.runWorker()方法 runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { //CAS獲取鎖,不會有阻塞 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

java.util.concurrent.ThreadPoolExecutor.Worker就是執行緒池中執行任務的類,其繼承了AQS並實現Runnable,所以它可以擁有AQS與Runnable的作用。

1.1 AQS作用

Worker繼承了AbstractQueuedSynchronizer,主要目的有兩個:

  • 將鎖的粒度細化到每個工Worker。
    • 如果多個Worker使用同一個鎖,那麼一個Worker Running持有鎖的時候,其他Worker就無法執行,這顯然是不合理的。
  • 直接使用CAS獲取,避免阻塞。
    • 如果這個鎖使用阻塞獲取,那麼在多Worker的情況下執行shutDown。如果這個Worker此時正在Running無法獲取到鎖,那麼執行shutDown()執行緒就會阻塞住了,顯然是不合理的。

1.2 Runnable作用

Worker還實現了Runnable,它有兩個屬性thead、firstTask。根據整體流程:

執行緒池呼叫execute—>建立Worker(設定屬性thead、firstTask)—>worker.thread.start()—>實際上呼叫的是worker.run()—>執行緒池的runWorker(worker)—>worker.firstTask.run()(如果firstTask為null就從等待佇列中拉取一個)。

轉了一大圈最終呼叫最開始傳進來的任務的run方法,不過通過等待佇列可以重複利用worker與worker中的執行緒,變化的只是firstTask。下面我們對執行緒池的runWorker方法進行探究。

2.Worker.run原始碼

2.1runWorker方法

Worker實現了Runnable,其run()方法中最終是走到了執行緒池的runWorker()方法。

public void run() {
    //實際是呼叫 ThreadPoolExecutor.runWorker()方法
    runWorker(this);
}
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        //任務是否正常執行完成
        boolean completedAbruptly = true;
        try {
            //如果task為null就通過getTask方法獲取阻塞佇列中的下一個任務
            //getTask方法一般不會返回null,所以這個while類似於一個無限迴圈
            //worker物件就通過這個方法的持續執行來不斷處理新的任務
            while (task != null || (task = getTask()) != null) {
                //每一次任務的執行都必須獲取鎖來保證下方臨界區程式碼的執行緒安全
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果狀態值大於等於STOP(狀態值是有序的,即STOP、TIDYING、TERMINATED)且當前執行緒還沒有被中斷,則主動中斷執行緒
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //執行任務前處理操作,預設是一個空實現;在子類中可以通過重寫來改變任務執行前的處理行為
                    beforeExecute(wt, task);
                    //儲存任務執行過程中丟擲的異常,提供給下面finally塊中的afterExecute方法使用
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        //異常包裝為Error
                        thrown = x; throw new Error(x);
                    } finally {
                        //任務後處理,同beforeExecute
                        afterExecute(task, thrown);
                    }
                } finally {
                    //將迴圈變數task設定為null,表示已處理完成
                    task = null;
                    //加當前worker已經完成的任務數
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //將completedAbruptly變數設定為false,表示任務正常處理完成
            completedAbruptly = false;
        } finally {
            //銷燬當前的worker物件,並完成一些諸如完成任務數量統計之類的輔助性工作
            //線上程池當前狀態小於STOP的情況下會建立一個新的worker來替換被銷燬的worker
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法的原始碼中有兩個比較重要的方法呼叫,一個是while條件中對getTask方法的呼叫,一個是在方法的最後對processWorkerExit方法的呼叫。

2.2 getTask方法

private Runnable getTask() {
    // 通過timeOut變量表示執行緒是否空閒時間超時了
    boolean timedOut = false;

    // 無限迴圈
    for (;;) {
        // 獲取執行緒池狀態
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果 執行緒池狀態>=STOP
        //    或者 (執行緒池狀態==SHUTDOWN && 阻塞佇列為空)
        // 則直接減少一個worker計數並返回null(返回null會導致當前worker被銷燬)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        // 獲取執行緒池中的worker計數
        int wc = workerCountOf(c);

        // 判斷當前執行緒是否會被超時銷燬
        // 會被超時銷燬的情況:執行緒池允許核心執行緒超時 或 當前執行緒數大於核心執行緒數
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果 (當前執行緒數大於最大執行緒數 或 (允許超時銷燬 且 當前發生了空閒時間超時))
        //   且 (當前執行緒數大於1 或 阻塞佇列為空) —— 該條件在阻塞佇列不為空的情況下保證至少會保留一個執行緒繼續處理任務
        // 則 減少worker計數並返回null(返回null會導致當前worker被銷燬)
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 從阻塞佇列中取出一個任務(如果佇列為空會進入阻塞等待狀態)
            // 如果允許空閒超時銷燬執行緒的話則帶有一個等待的超時時間
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 如果獲取到了任務就直接返回該任務,返回後會開始執行該任務
            if (r != null)
                return r;
            // 如果任務為null,則說明發生了等待超時,將空閒時間超時標誌設定為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果等待被中斷了,那說明空閒時間(等待任務的時間)還沒有超時
            timedOut = false;
        }
    }
}

getTask方法在阻塞佇列中有待執行的任務時會從佇列中彈出一個任務並返回,如果阻塞佇列為空,那麼就會阻塞等待新的任務提交到佇列中直到超時(在一些配置下會一直等待而不超時),如果在超時之前獲取到了新的任務,那麼就會將這個任務作為返回值返回。所以一般getTask方法是不會返回null的,只會阻塞等待下一個任務並在之後將這個新任務作為返回值返回。

當getTask方法返回null時會導致當前Worker退出,當前執行緒被銷燬。在以下情況下getTask方法才會返回null:

  1. 當前執行緒池中的執行緒數超過了最大執行緒數。這是因為執行時通過呼叫setMaximumPoolSize修改了最大執行緒數而導致的結果;
  2. 執行緒池處於STOP狀態。這種情況下所有執行緒都應該被立即回收銷燬;
  3. 執行緒池處於SHUTDOWN狀態,且阻塞佇列為空。這種情況下已經不會有新的任務被提交到阻塞佇列中了,所以執行緒應該被銷燬;
  4. 執行緒可以被超時回收的情況下等待新任務超時。執行緒被超時回收一般有以下兩種情況:
    • 超出核心執行緒數部分的執行緒等待任務超時
    • 允許核心執行緒超時(執行緒池配置)的情況下執行緒等待任務超時

2.3processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly為true則表示任務執行過程中丟擲了未處理的異常
    // 所以還沒有正確地減少worker計數,這裡需要減少一次worker計數
    if (completedAbruptly)
        decrementWorkerCount();

    // 獲取執行緒池的主鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 把將被銷燬的執行緒已完成的任務數累計到執行緒池的完成任務總數上
        completedTaskCount += w.completedTasks;
        // 從worker集合中去掉將會銷燬的worker
        workers.remove(w);
    } finally {
        // 釋放執行緒池主鎖
        mainLock.unlock();
    }

    // 嘗試結束執行緒池
    // 這裡是為了在關閉執行緒池時等到所有worker都被回收後再結束執行緒池
    tryTerminate();

    int c = ctl.get();
    // 如果執行緒池狀態 < STOP,即RUNNING或SHUTDOWN
    // 則需要考慮建立新執行緒來代替被銷燬的執行緒
    if (runStateLessThan(c, STOP)) {
        // 如果worker是正常執行完的,則要判斷一下是否已經滿足了最小執行緒數要求
        // 否則直接建立替代執行緒
        if (!completedAbruptly) {
            // 如果允許核心執行緒超時則最小執行緒數是0,否則最小執行緒數等於核心執行緒數
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果阻塞佇列非空,則至少要有一個執行緒繼續執行剩下的任務
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果當前執行緒數已經滿足最小執行緒數要求
            // 那麼就不建立替代執行緒了
            if (workerCountOf(c) >= min)
                return;
        }

        // 重新建立一個worker來代替被銷燬的執行緒
        addWorker(null, false);
    }
}

processWorkerExit方法會銷燬當前執行緒對應的Worker物件,並執行一些累加總處理任務數等輔助操作,但線上程池當前狀態小於STOP的情況下會建立一個新的Worker來替換被銷燬的Worker。

參考:

https://segmentfault.com/a/1190000018630751