1. 程式人生 > >執行緒池原始碼分析

執行緒池原始碼分析

## 概述 在 java 中,執行緒池 ThreadPoolExecutor 是一個繞不過去的類,它是享元模式思想的體現,通過在容器中建立一定數量的執行緒加以重複利用,從而避免頻繁建立執行緒帶來的額外開銷。一個設定合理的執行緒池可以提高任務響應的速度,並且避免執行緒數超過硬體能力帶來的意外情況。 在本文,將深入執行緒池原始碼,瞭解執行緒池的底層實現與執行機制。 ## 一、構造方法 ThreadPoolExecutor 類一共提供了四個構造方法,我們基於引數最完整構造方法瞭解一下執行緒池建立所需要的變數: ~~~java public ThreadPoolExecutor(int corePoolSize, // 核心執行緒數 int maximumPoolSize, // 最大執行緒數 long keepAliveTime, // 非核心執行緒閒置存活時間 TimeUnit unit, // 時間單位 BlockingQueue workQueue, // 工作佇列 ThreadFactory threadFactory, // 建立執行緒使用的執行緒工廠 RejectedExecutionHandler handler // 拒絕策略) { } ~~~ - 核心執行緒數:即長期存在的執行緒數,當執行緒池中執行執行緒未達到核心執行緒數時會優先建立新執行緒; - 最大執行緒數:當核心執行緒已滿,工作佇列已滿,同時執行緒池中執行緒總數未超過最大執行緒數,會建立非核心執行緒; - 非核心執行緒閒置存活時間:當非核心執行緒閒置的時的最大存活時間; - 時間單位:非核心執行緒閒置存活時間的時間單位; - 任務佇列:當核心執行緒滿後,任務會優先加入工作佇列,等等待核心執行緒消費; - 執行緒工廠:執行緒池建立新執行緒時使用的執行緒工廠; - 拒絕策略:當工作佇列與執行緒池都滿時,用於執行的策略; ## 二、執行緒池狀態 ### 1.執行緒池狀態 執行緒池擁有一個 AtomicInteger 型別的成員變數 ctl ,通過位運算分別使用 ctl 的高位低位以便在一個值中儲存執行緒數量以及執行緒池狀態。 ~~~java private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 29(32-3) private static final int COUNT_BITS = Integer.SIZE - 3; // 允許的最大工作執行緒(2^29-1 約5億) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 執行狀態。執行緒池接受並處理新任務 private static final int RUNNING = -1 << COUNT_BITS; // 關閉狀態。執行緒池不能接受新任務,處理完剩餘任務後關閉。呼叫shutdown()方法會進入該狀態。 private static final int SHUTDOWN = 0 << COUNT_BITS; // 停止狀態。執行緒池不能接受新任務,並且嘗試中斷舊任務。呼叫shutdownNow()方法會進入該狀態。 private static final int STOP = 1 << COUNT_BITS; // 整理狀態。由關閉狀態轉變,執行緒池任務佇列為空時進入該狀態,會呼叫terminated()方法。 private static final int TIDYING = 2 << COUNT_BITS; // 終止狀態。terminated()方法執行完畢後進入該狀態,執行緒池徹底停止。 private static final int TERMINATED = 3 << COUNT_BITS; ~~~ ### 2.執行緒狀態的計算 這裡比較不好理解的是上述-1的位運算,下面我們來分析一下: 在計算機中,二進位制負數一般用補碼錶示,即原始碼取反再加一。但又有這種說法,即將最高位作為符號位,0為正數,1為負數。實際上兩者是可以結合在一起看的。假如數字是單位元組數,1 位元組對應8 bit,即八位,現在,我們要計算 - 1。 按照第二種說法,最高位為符號位,則有 1/000 0001,然後按第一種說法取反後+1,並且符號位不變,則有 1/111 1110 + 1,即 1/111 1111。 現在回到 `-1 << COUNT_BITS`這行程式碼: 一個 int 是 4 個位元組,對應 32 bit,按上述過程 -1 轉為二進位制即為 1/111......1111(32個1), `COUNT_BITS`是 29,-1 左移 29 位,最終得到 111.0...0000。 同理,計算其他的幾種狀態,可知分別是: | 狀態 | 二進位制 | | :--------- | :------------ | | RUNNING | 111...0....00 | | SHUTDOWN | 000...0....00 | | STOP | 001...0....00 | | TIDYING | 010...0....00 | | TERMINATED | 011...0....00 | 其中,**我們可以知道 SHUTDOWN 狀態轉為十進位制也是 0 ,而 RUNNING 作為有符號數,它的最高位是 1,說明轉為十進位制以後是個負數,其他的狀態最高位都是 0,轉為十進位制之後都是正數**,也就是說,我們可以這麼認為: **小於 SHUTDOWN 的就是 RUNNING,大於 SHUTDOWN 就是停止或者停止中。** 這也是後面狀態計算的一些寫法的基礎。比如 `isRunning()`方法: ~~~java private static boolean isRunning(int c) { return c < SHUTDOWN; } ~~~ ### 3.執行緒狀態與工作執行緒數的獲取 ~~~java // 根據當前執行狀態和工作執行緒數獲取當前的 ctl private static int ctlOf(int rs, int wc) { return rs | wc; } // 獲取執行狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取工作執行緒數 private static int workerCountOf(int c) { return c & CAPACITY; } ~~~ 前面獲取狀態的時候呼叫了 `ctlOf()`方法,根據前面,我們可以知道,`CAPACITY`實際上是 29 位,而執行緒狀態用的是 32 - 30 共 3 位,也就是說,**ctl 共 32 位,高3 位用於表示執行緒池狀態,而低 29 位表示工作執行緒的數量**。 這樣上述三個方法就很好理解了: - `ctlOf()`:獲取 ctl。 將工作執行緒數量與執行狀態進行於運算,假如我們處於 RUNNING,並且有 1 個工作執行緒,那麼 ctl = 111....000 | 000.... 001,最終得到 111 ..... 001; - `runStateOf()`:獲取執行狀態。 繼續根據上文的資料,`~CAPACITY` 取反即為 111....000,與執行狀態 111...0000 與運算,最終得到 111....000,相當於低位掩碼,消去低 29 位; - `workerCountOf()`:獲取工作執行緒數。 同理,`c & CAPACITY`裡的 CAPACITY 相當於高位掩碼,用於消去高 3 位,最終得到 00...001,即工作執行緒數。 同理,**如果要增加工作執行緒數,就直接通過 CAS 去遞增 ctl**,比如新建執行緒中使用的公共方法: ```java private boolean compareAndIncrementWorkerCount(int expect) { // 通過 CAS 遞增 ctl return ctl.compareAndSet(expect, expect + 1); } ``` **要改變執行緒池狀態,就根據當前工作執行緒和要改變的狀態去合成新的 ctl,然後 CAS 改變 ctl**,比如 `shutdown()`中涉及的相關程式碼: ~~~java private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || // 通過 CAS 改變 ctl ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } ~~~ ## 三、任務的建立與執行 執行緒池任務提交方法是 `execute()`,根據程式碼可知,當一個任務進來時,分四種情況: - 當前工作執行緒數小於核心執行緒數,啟動新執行緒; - 當前工作執行緒數大於核心執行緒數,但是未大於最大執行緒數,嘗試新增到工作佇列; - 當前執行緒池核心執行緒和佇列都滿了,嘗試建立新非核心執行緒。 - 非核心執行緒建立失敗,說明執行緒池徹底滿了,執行拒絕策略。 ~~~java public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.當前工作執行緒數小於核心執行緒數,啟動新執行緒 if (workerCountOf(c) < corePoolSize) { // 新增任務 if (addWorker(command, true)) return; c = ctl.get(); } // 2. 當前工作執行緒數大於核心執行緒數,但是未大於最大執行緒數,嘗試新增到工作佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果當前執行緒處於非執行態,並且移除當前任務成功,則拒絕任務(防止新增到一半就shutdown) if (! isRunning(recheck) && remove(command)) reject(command); // 如果當前沒有工作執行緒了,就啟動新執行緒 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.當前執行緒池核心執行緒和佇列都滿了,嘗試建立新非核心執行緒 else if (!addWorker(command, false)) // 4.執行緒池徹底滿了,執行拒絕策略 reject(command); } ~~~ ### 1.新增任務 新增任務依靠 `addWorker()`方法,這個方法很長,但是主要就幹了兩件事: - CAS 讓 ctl 的工作執行緒數 +1; - 啟動新的執行緒; ~~~java private boolean addWorker(Runnable firstTask, boolean core) { retry: // 1.改變 ctl 使工作執行緒+1 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果當前不處於執行狀態,傳入任務為空,並且任務佇列為空的時候拒絕新增新任務 // 即執行緒池 shutdown 時不讓新增新任務,但是執行繼續跑完任務佇列裡的任務。 if (rs >
= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 執行緒不允許超過最大執行緒數,核心執行緒不允許超過最大核心執行緒數 if (wc >= CAPACITY || wc >
= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 遞增工作執行緒數 if (compareAndIncrementWorkerCount(c)) // 失敗了就重新回到上面的retry處繼續往下執行 break retry; // 更新 ctl c = ctl.get(); // 如果執行狀態改變了就全部從來 if (runStateOf(c) != rs) continue retry; } } // 2.啟動新執行緒 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立新執行緒 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 如果執行緒池處於執行狀態,或者沒有新任務的SHUTDOWN狀態(即SHUTDOW以後還在消費工作佇列裡的任務) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 執行緒是否在未啟動前就已經啟動了 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 如果集合中的工作執行緒數大於最大執行緒數,則將池中最大執行緒數改為當前工作執行緒數 if (s >
largestPoolSize) largestPoolSize = s; // 執行緒建立完成 workerAdded = true; } } finally { mainLock.unlock(); } // 如果執行緒成功建立,就啟動執行緒,並且更改啟動狀態為成功 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果執行緒啟動不成功,就執行失敗策略 if (! workerStarted) // 啟動失敗策略,從當前工作執行緒佇列移除當前啟動失敗的執行緒,遞減工作執行緒數,然後嘗試關閉執行緒池(如果當前任務就是執行緒池最後一個任務) addWorkerFailed(w); } return workerStarted; } ~~~ ### 2. 任務物件Worker 根據上文,不難發現,線上程池中執行緒往往以 Worker 物件的方式存在,那麼這個 Worker 又是何方神聖? ~~~java private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 工作執行緒 final Thread thread; // 要執行的任務 Runnable firstTask; // 執行緒執行過的任務數 volatile long completedTasks; // 通過執行緒工廠建立工作執行緒 Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 執行任務 public void run() { runWorker(this); } ... ... } ~~~ 這個 Worker 類繼承了 AQS,也就是說,他本身就相當於一個同步佇列,結合他的成員變數 thread 和 firstTask,可以知道他實際上就是我們執行緒池中所說的“執行緒”。除了父類 AQS 本身提供的獨佔鎖以外,Worker 還提供了一些檢查任務執行緒執行狀態以及中斷執行緒相關的方法。 此外,執行緒池中還有一個**工作佇列 workers,用於儲存當前全部的 Worker**: ~~~java private final HashSet workers = new HashSet(); ~~~ ### 3.任務的啟動 當呼叫 `Worker.run()`的時候,其實呼叫的是 `runWorker()`方法。 `runWorker()`方法實際上就是呼叫執行緒執行任務的方法,他的邏輯大題是這樣的: - 拿到入參的新 Worker,一直迴圈獲取 Worker 裡的任務; - 加鎖然後執行任務; - 如果執行完任務流程,並且沒有發生異常導致 Worker 掛掉,就直接複用 Worker(在獲取任務的方法 `getTask()`中迴圈等待任務); - 如果執行完任務流程後發現**發生異常**導致 Worker 掛掉,就從工作佇列中移除當前 Worker,並且補充一個新的; 如果整個流程執行完畢,就刪除當前的 Worker。 ```java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 新建立的Worker預設state為-1,AQS的unlock方法會將其改為0,此後允許使用interruptIfStarted()方法進行中斷 // 完成任務以後是否需要移除當前Worker,即當前任務是否意外退出 boolean completedAbruptly = true; try { // 迴圈獲取任務 while (task != null || (task = getTask()) != null) { // 加鎖,防止 shundown 時中斷正在執行的任務 w.lock(); // 如果執行緒池狀態為 STOP 或更後面的狀態,中斷執行緒任務 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 鉤子方法,預設空實現,需要自己提供 beforeExecute(wt, task); Throwable thrown = null; try { // 執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 鉤子方法 afterExecute(task, thrown); } } finally { task = null; // 任務執行完畢 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 根據completedAbruptly決定是否要移除意外退出的Worker,並補充新的Worker // 也就是說,如果上述過程順利完成,工作執行緒沒有掛掉,就不刪除,下次繼續用,否則就幹掉它再補充一個。 processWorkerExit(w, completedAbruptly); } } ``` ### 4.任務的獲取與超時處理 在 `runWorker()`方法中,通過 `getTask()`方法去獲取任務。值得注意的是,超時處理也在此處,簡單的來說,整套流程是這樣的: - 判斷執行緒池是否關閉,工作佇列是否為空,如果是說明沒任務了,直接返回null,否則接著往下判斷; - 判斷當前是否存在非核心執行緒,如果是說明需要進行超時處理; - 獲取任務,如果不需要超時處理,則直接從任務佇列獲取任務,否則根據 keepaliveTime 阻塞一段時間後獲取任務,如果獲取不到,說明非核心執行緒超時,返回 null 交給 `runWorker()`中的`processWorkerExit()`方法去刪除; 換句話說,`runWorker()`方法一旦執行完畢,必然會刪除當前的 Worker,而通過 `getTask()`拿任務的 Worker,線上程池正常執行的狀態下,**核心執行緒只會一直在 for 迴圈中等待直到拿到任務**,而非核心執行緒超時以後拿不到任務就會返回一個 null,然後回到 `runWorker()`中走完`processWorkerExit()`方法被刪除。 ```java private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 如果執行緒池關閉了,並且工作佇列裡的任務都完成了,或者執行緒池直接進入了 STOP 或更進一步的狀態,就不返回新任務 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 獲取當前工作執行緒 int wc = workerCountOf(c); // 核心執行緒是否超時(預設false)或當前是否存在非核心執行緒,即判斷當前當前是否需要進行超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 判斷執行緒是否超過最大執行緒數或存在非核心執行緒 if ((wc > maximumPoolSize || (timed && timedOut)) // 並且除非任務佇列為空,否則池中最少有一個執行緒 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 獲取任務 Runnable r = timed ? // 阻塞 keepaliveTime 以獲取任務,如果在 keepaliveTime 時間內沒有獲取到任務,則返回 null. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果獲取不到任務,說明非核心執行緒超時了,下一輪判斷確認是否退出迴圈。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` ## 四、執行緒池的中斷 ![image-20210211171605477](http://img.xiajibagao.top/image-20210211171605477.png) 執行緒池的中斷方法分為三種: - `shutdown()`:中斷執行緒池,不再新增新任務,同時**等待當前進行和佇列中的任務完成**; - `shutdownNow()`:立即中斷執行緒池,不再新增新任務,**同時中斷所有工作中的任務,不再處理任務佇列中任務**。 ### 1.shutdown shutdown 是有序關閉。主要乾了三件事: - 改變當前執行緒池狀態為 SHUTDOWN; - 將當前工作佇列中的全部執行緒標記為中斷; - 完成上述過程後將執行緒池狀態改為 TIDYING ```java public void shutdown() { final ReentrantLock mainLock = this.mainLock; // 加鎖 mainLock.lock(); try { checkShutdownAccess(); // 改變當前執行緒池狀態 advanceRunState(SHUTDOWN); // 中斷當前執行緒 interruptIdleWorkers(); // 鉤子函式,預設空實現 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } ``` 其中,`interruptIdleWorkers()`方法如下: ~~~java private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍歷工作佇列中的全部 Worker for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { // 標記為中斷 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } ~~~ ### 2.shutdownNow `shutdownNow()` 與 `shutdown()`流程類似,但是會直接將狀態轉為 STOP,在 `addWorker()` 或者`getTask()`等處理任務的相關方法裡,**會針對 STOP 或更進一步的狀態做區分,將不會再處理任務佇列中的任務**,配合`drainQueue()`方法以刪除任務佇列中的任務。 ~~~java public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 改變當前執行緒池狀態 advanceRunState(STOP); // 中斷當前執行緒 interruptWorkers(); // 刪除任務佇列中的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } ~~~ ## 五、拒絕策略 當任務佇列已滿,並且執行緒池中執行緒也到達最大執行緒數的時候,就會呼叫拒絕策略。也就是`reject()`方法 ```java final void reject(Runnable command) { handler.rejectedExecution(command, this); } ``` 拒絕策略共分四種: - AbortPolicy:拒絕策略,直接丟擲異常,預設策略; - CallerRunsPolicy:呼叫者執行策略,用呼叫者所在的執行緒來執行任務; - DiscardOldestPolicy:棄老策略,無聲無息的丟棄阻塞佇列中靠最前的任務,並執行當前任務; - DiscardPolicy:丟棄策略,直接無聲無息的丟棄任務; 我們可以簡單的瞭解一下他們的實現: **AbortPolicy** ~~~java throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); ~~~ **CallerRunsPolicy** ~~~java public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } ~~~ **DiscardOldestPolicy** ~~~java public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 彈出隊頭元素 e.getQueue().poll(); e.execute(r); } } ~~~ **DiscardPolicy** ~~~java public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // Does nothing } ~~~ ## 六、執行緒池的鉤子函式 和 HashMap 與 LinkedHashMap 中的行為有點類似,線上程池的程式碼中,有些方法呼叫了一些具有空實現的方法,這些方法是提供給使用者去繼承並重寫的鉤子函式,主要包括三個: - `beforeExecute()`:在執行任務之前回調 - `afterExecute()`:在任務執行完後回撥 - `terminated()`:線上程池中的所有任務執行完畢後回撥 通過繼承 ThreadPoolExecutor 類,並重寫以上三個方法,我們可以進行監控或者輸出日誌,更方便的瞭解執行緒池的狀態。 值得一提的是,`afterExecute()`方法的入參型別是`(Runnable r, Throwable t)`,也就是說,如果執行緒執行中丟擲異常,我們也可以通過該方法去捕獲異常並作出相應的處理。 ## 七、總結 執行緒池提供了四個構造方法,引數最全的構造方法引數按順序有:核心執行緒數,最大執行緒數,非核心執行緒閒置存活時間,存活時間單位,任務佇列,執行緒工廠,拒絕策略。 執行緒池共有五種狀態,分別是:RUNNING,SHUTDOWN,STOP,TYDYING,TERMINATED,它們與工作執行緒數量一同記錄在成員變數 ctl 中,其中高 3 位用於記錄狀態,低 29 位用於記錄工作執行緒數,實際使用中通過位運算去獲取。 執行緒池中任務執行緒以繼承了 AQS 的 Worker 類的例項形式存在。當新增任務時,會有四種情況:核心執行緒不滿,優先建立核心執行緒;核心執行緒滿,優先新增任務佇列;核心執行緒與佇列都滿,建立非核心執行緒;執行緒和佇列都滿,則執行拒絕策略。 其中,拒絕策略分為四類,預設的拒絕策略 AbortPolicy;呼叫者執行策略 CallerRunsPolicy;棄老策略 DiscardOldestPolicy;丟棄策略 DiscardPolicy。 執行緒池的中斷有兩個方法:`shutdown()`與 `shutdownNow()`,兩者都會讓執行緒池不再接受新任務,但是 `shutdown()`會等待當前與任務佇列中的任務執行完畢,而 `shutdownNow()`會直接中斷當前任務,忽略並刪除任務佇列中的任務。 執行緒池提供了`beforeExecute()`,`afterExecute()`,`terminated()`三個鉤子函式,其中,`afterExecute()`的入參含有丟擲的異常,因此可以藉由該方法處理執行緒池中執行緒丟擲的