1. 程式人生 > >Java8執行緒池ThreadPoolExecutor底層原理及其原始碼解析

Java8執行緒池ThreadPoolExecutor底層原理及其原始碼解析


# 小侃一下 日常開發中, 或許不會直接new執行緒或執行緒池, 但這些執行緒相關的基礎或思想是非常重要的, 參考**林迪效應**; 就算沒有直接用到, 可能間接也用到了類似的思想或原理, 例如tomcat, jetty, 資料庫連線池, MQ; 本文不會對執行緒的基礎知識進行介紹, 所以最好已"進食"關於執行緒的基礎知識, 再"食用"本文更佳; 由於在下的工作及其它原因, 前後花費了數月的時間才完成這篇部落格, 希望能幫助到想要了解`ThreadPoolExecutor`執行緒池原始碼和原理的同學.
# 1. 使用執行緒池的好處. 為什麼要使用執行緒池? 1. 避免**頻繁建立、銷燬**執行緒的開銷; 複用建立的執行緒. 2. **及時響應**提交的任務; 提交一個任務,不再是每次都需要建立新的執行緒. 3. 避免每次提交的任務都新建執行緒, 造成**伺服器資源耗盡**, **執行緒頻繁上下文切換**等伺服器資源開銷. 4. 更容易**監控、管理**執行緒; 可以統計出已完成的任務數, 活躍的執行緒數, 等待的任務數等, 可以重寫hook方法`beforeExecute`, `afterExecute`, `terminated` , 重寫之後, 結合具體的業務進行處理.
# 2. 執行緒池核心引數介紹 | 引數 | 意義 | | --------------- | ------------------------------------------------------------ | | corePoolSize | 執行緒池中的核心執行緒數 | | workQueue | 存放提交的task | | maximumPoolSize | 執行緒池中允許的最大執行緒數 | | threadFactory | 執行緒工廠, 用來建立執行緒, 由`Executors#defaultThreadFactory`實現 | | keepAliveTime | 空閒執行緒存活時間(預設是臨時執行緒, 也可設定為核心執行緒) | | unit | 空閒執行緒存活時間單位列舉 | 下面將結合線程池中的任務提交流程加深理解.
# 3. 提交任務到執行緒池中的流程 ## 3.1 ThreadPoolExecutor#execute方法整體流程 這裡以`java.util.concurrent.ThreadPoolExecutor#execute`方法為例, 畫一個簡單的圖:
![](https://img2020.cnblogs.com/blog/1158841/202006/1158841-20200613170548341-653070237.png) 上圖中的worker可簡單理解為執行緒池中的一個執行緒, `workers.size()`即使**執行緒池中的執行緒數**; 1. 當`workers.size()`小於`corePoolSize`時, 建立新的執行緒執行提交的task. 2. 當`workers.size()`大於`corePoolSize`時, 並且`workQueue`沒有滿, 將task新增到`workQueue`. 3. 當`workers.size()`大於`corePoolSize`時, 並且`workQueue`已經滿了, 但是`workers.size()=maximumPoolSize`, 執行拒絕策略. 後續會有對`ThreadPoolExecutor#execute`方法的詳細解讀: [execute方法原始碼: 提交task到執行緒池](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_5_0). 4種預設的拒絕策略: [ThreadPoolExecutor預設實現的4種拒絕策略](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_6_1).
## 3.2 排隊恰火鍋的場景 這裡我們可以想像一個場景: 去海底撈吃火鍋; 下午4點晚市正式開始排隊, 假如店內一共有16張桌子, 陸續光臨的16組客人將店內坐滿; 店外一共有20組客人座位, 則第17~36組客人坐在店外排隊; 第37組客人來了, 啟動臨時餐桌供客人吃飯. 所以, 這裡的店內16張桌子則是`corePoolSize`, 店外一共有20組座位則為`BlockingQueue`, 而臨時餐桌數量即`maximumPoolSize-corePoolSize`. 上面的例子並非絕對完美, 僅僅是為了便於我們理解執行緒池的各個引數, 以及加深印象.
# 4. ThreadPoolExecutor執行緒池原始碼及其原理 有了上面對執行緒池的總體瞭解後, 下面結合原始碼來看看執行緒池的底層原理吧!
## 4.1 從建立ThreadPoolExecutor開始: 執行緒池建構函式的原始碼 ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } ``` 上面是`ThreadPoolExecutor`引數最少的一個構造方法, 預設的`ThreadFactory`是`Executors.defaultThreadFactory()`, 預設的 `RejectedExecutionHandler`是`defaultHandler = new AbortPolicy()`; ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ``` 上面是`ThreadPoolExecutor`引數最多的一個構造方法, 其他構造方法都是傳入引數呼叫這個構造方法, 預設的執行緒工廠見[預設的執行緒工廠Executors#defaultThreadFactory](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_6_0), 各個引數在[執行緒池核心引數介紹](https://www.cnblogs.com/theRhyme/p/13056215.html#_label2)已經介紹.
## 4.2 ThreadPoolExecutor中的一些重要的屬性 對一些重要屬性有基礎的認知, 有助於後面我們更容易看懂原始碼流程. ### 4.2.1 執行緒池的執行狀態 ```java private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; ``` 根據上面原始碼可知, `COUNT_BITS`的值為29, `CAPACITY`的值為2的29次方-1, 二進位制表示為: "00011111111111111111111111111111"(明顯29個1); 上面的原始碼中執行緒池的執行狀態的二進位制表示: | 狀態 | 二進位制 | 意義 | | ------------ | -------------------------------- | ------------------------------------------------------------ | | `RUNNING` | 11100000000000000000000000000000 | 接受**新execute**的task, 執行**已入隊**的task | | `SHUTDOWN` | 0 | 不接受**新execute**的task, 但執行**已入隊**的task, 中斷所有空閒的執行緒 | | `STOP` | 00100000000000000000000000000000 | 不接受**新execute**的task, 不執行**已入隊**的task, 中斷所有的執行緒 | | `TIDYING` | 01000000000000000000000000000000 | 所有執行緒停止, `workerCount`數量為0, 將執行hook方法: terminated() | | `TERMINATED` | 01100000000000000000000000000000 | terminated()方法執行完畢 | 可以看出, 執行緒池的狀態由32位`int`整型的二進位制的**前三位**表示. 下圖根據`Javadoc`所畫:
![](https://img2020.cnblogs.com/blog/1158841/202006/1158841-20200613171625982-248501754.png)
### 4.2.2 核心屬性`ctl`原始碼(執行緒池狀態和有效執行緒數) ```java private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ``` 核心屬性`ctl`, 資料型別是`AtomicInteger`, 表示了兩個含義: 1. 執行緒池執行狀態(`runState`) 2. 執行緒池中的有效執行緒數(`workerCount`) 那是如何做到一個屬性表示兩個含義的呢? 那就要看看`ctlOf`方法 ```java private static int ctlOf(int rs, int wc) { return rs | wc; } ``` `ctlOf`方法線上程池內部用來更新執行緒池的`ctl`屬性,比如`ctl`初始化的時候: `ctl = new AtomicInteger(ctlOf(RUNNING, 0))`, 呼叫`ThreadPoolExecutor#shutdown`方法等; `rs`表示`runState`, `wc`表示`workerCount`; 將 `runState`和`workerCount`做**按位或**運算得到`ctl`的值; 而`runState`和`workerCount`的值由下面兩個方法packing和unpacking, 這裡的形參`c`就是`ctl.get()`的值; ```java // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } ``` 下面用表格更清晰理解: | 方法 | 方法體 | 帶入CAPACITY的值 | | --------------- | --------------- | -------------------------------------- | | `runStateOf` | `c & ~CAPACITY` | `c & 11100000000000000000000000000000` | | `workerCountOf` | `c & CAPACITY` | `c & 00011111111111111111111111111111` | **按位與**運算, 相同位置, 同1才為1, 其餘為0; 結合表格看, `runStateOf`方法取`ctl`前3位表示`runState`, `workerCountOf`方法取第4~32位的值表示`workerCount`; 相信大家已經明白`runState`和`workerCount`如何被packing和unpacking, 這就是為什麼`ctl`能即表示`runState`又能表示`wokerCount`. Note: 眾所周知, 與2的整數次冪-1進行按位與運算結果等於取餘運算的結果, 而位運算效率高於取餘運算, 與Java8及其之後的`HashMap`的雜湊方式有同曲同工之妙, 見:https://www.cnblogs.com/theRhyme/p/9404082.html#_lab2_1_16.
### 4.2.3 執行緒池中的mainLock鎖 ```java private final ReentrantLock mainLock = new ReentrantLock(); ``` 這把可重入鎖, 線上程池的很多地方會被用到; 比如要對**workers**(執行緒池中的執行緒集合)操作的時候(如新增一個worker到工作中), interrupt所有的 `workers`, 呼叫[shutdown](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_5_5)方法等.
### 4.2.4 執行緒池中的執行緒集合 ```java private final HashSet workers = new HashSet(); ``` 用來儲存當前執行緒池中的所有執行緒; 可通過該集合對執行緒池中的執行緒進行**中斷**, **遍歷**等; 建立新的執行緒時, 要新增到該集合, 移除執行緒, 也要從該集合中移除對應的執行緒; 對該集合操作都需要`mainLock`鎖.
### 4.2.5 mainLock的Condition()物件 ```java private final Condition termination = mainLock.newCondition(); ``` 主要是為了讓[tryTerminate](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_6_3)方法與[awaitTermination](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_6_4)方法結合使用; 而`tryTerminate`又被`shutdown`、`shutdownNow`、`processWorkerExit`等方法呼叫; Condition物件`termination`的作用就是當執行緒池中的狀態表示的值小於**TERMINATED**的值3時, 當前呼叫了[awaitTermination](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_6_4)方法的執行緒就會wait對應的時間; 等到過了指定的wait時間, 或者執行緒池狀態等於或大於TERMINATED, wait的執行緒被喚醒, 就繼續執行; 如果不清楚`wait(long)`與`wait()`的區別可參考: [Object#wait()與Object#wait(long)的區別](https://www.cnblogs.com/theRhyme/p/12992852.html).
### 4.2.6 執行緒池中曾經達到的最大執行緒數 ```java private int largestPoolSize; ``` 用作監控, 檢視當前執行緒池, 執行緒數最多的時候的數量是多少, 見方法`ThreadPoolExecutor#getLargestPoolSize`; `mainLock`保證其可見性和原子性.
### 4.2.7 執行緒池中已完成的任務數 ```java private long completedTaskCount; ``` 通過方法`ThreadPoolExecutor#getCompletedTaskCount`獲取.
### 4.2.8 核心執行緒池中的空閒執行緒 ```java private volatile boolean allowCoreThreadTimeOut; ``` 預設情況下, 只有臨時執行緒超過了`keepAliveTime`的時間會被回收; `allowCoreThreadTimeOut`預設為false, 如果設定為true, 則會通過**中斷**或[getTask](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_5_4)的結果為**null**的方式停止超過`keepAliveTime`的**核心執行緒**, 具體見[getTask](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_5_4)方法, 後續會詳細介紹.
# 5. ThreadPoolExecutor一些重要的方法原始碼及其原理解析 ## 5.1 execute方法原始碼: 提交task到執行緒池 ```java public void execute(Runnable command) { // 如果task為null, 丟擲NPE if (command == null) throw new NullPointerException(); // 獲得ctl的int值 int c = ctl.get(); // workerCount小於corePoolSize if (workerCountOf(c) < corePoolSize) { // 新增一個新的worker, 作為核心執行緒池的執行緒 if (addWorker(command, true)) // 新增worker作為核心執行緒成功, execute方法退出 return; // 新增worker作為核心執行緒失敗, 重新獲取ctl的int值 c = ctl.get(); } // 執行緒池是RUNNING狀態並且task入阻塞佇列成功 if (isRunning(c) && workQueue.offer(command)) { // double-check, 再次獲取ctl的值 int recheck = ctl.get(); // 執行緒池不是RUNNING狀態並且當前task從workerQueue被移除成功 if (! isRunning(recheck) && remove(command)) // 執行拒絕策略 reject(command); // 執行緒池中的workerCount為0 else if (workerCountOf(recheck) == 0) // 啟動一個非核心執行緒, 由於這裡的task引數為null, 該執行緒會從workerQueue拉去任務 addWorker(null, false); } // 新增一個非核心執行緒執行提交的task else if (!addWorker(command, false)) // 新增一個非核心執行緒失敗, 執行拒絕策略 reject(command); } ``` 結合上面程式碼中的註釋和[提交任務到執行緒池中的流程](https://www.cnblogs.com/theRhyme/p/13056215.html#_label3), 相信我們已經對這個`execute`方法提交task到執行緒池的流程的原始碼更加清晰了.
## 5.2 addWorker方法原始碼: 建立執行緒並啟動, 執行提交的task ```java private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 執行緒池執行狀態 int rs = runStateOf(c); // 如果執行緒池執行狀態大於等於SHUTDOWN, 提交的firstTask為null, workQueue為null,返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // workerCount int wc = workerCountOf(c); // 執行緒數大於了2的29次方-1, 或者想要新增為核心執行緒但是核心執行緒池滿, 或者想要新增為臨時執行緒, 但是workerCount等於或大於了最大的執行緒池執行緒數maximumPoolSize, 返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS的方式讓workerCount數量增加1,如果成功, 終止迴圈 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 再次檢查runState, 如果被更改, 重頭執行retry程式碼 if (runStateOf(c) != rs) continue retry; // 其他的, 上面的CAS如果由於workerCount被其他執行緒改變而失敗, 繼續內部的for迴圈 } } // 標誌位workerStarted, workerAdded boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 傳入task物件, 建立Worker物件 w = new Worker(firstTask); // 從worker物件中回去Thread物件 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 獲取mainLock鎖 mainLock.lock(); try { // 獲取mainLock鎖之後, 再次檢查runState int rs = runStateOf(ctl.get()); // 如果是RUNNING狀態, 或者是SHUTDOWN狀態並且傳入的task為null(執行workQueue中的task) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 執行緒已經被啟動, 丟擲IllegalThreadStateException if (t.isAlive()) throw new IllegalThreadStateException(); // 將worker物件新增到HashSet workers.add(w); int s = workers.size(); // 執行緒池中曾經達到的最大執行緒數(上面4.2.6提到過) if (s > largestPoolSize) largestPoolSize = s; // worker被新增成功 workerAdded = true; } } finally { // 釋放mainLock鎖 mainLock.unlock(); } // 如果worker被新增成功, 啟動執行緒, 執行對應的task if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果執行緒啟動失敗, 執行addWorkerFailed方法 if (! workerStarted) addWorkerFailed(w); } return workerStarted; } ``` 每行程式碼都有詳細的對應的註釋, 相信我們已經明白了`addWorker`方法的過程.
## 5.3 Worker類原始碼: 執行緒是如何執行提交到執行緒池中的task? 上面的[addWorker](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_5_1)方法中, 獲得**Worker**物件中的**Thread**物件(`final Thread t = w.thread;`), 並呼叫執行緒的**start**方法啟動執行緒執行**Worker中的run**方法.
### 5.3.1 Worker 的定義 繼承了**AQS(AbstractQueuedSynchronizer)**, 重寫了部分方法, 這裡的主要作用主要是通過[tryLock](https://www.cnblogs.com/theRhyme/p/13056215.html#_label3_5_2_6)或[isLocked](https://www.cnblogs.com/theRhyme/p/13056215.html#_label3_5_2_7)方法判斷**當前執行緒是否正在執行Worker中的run方法**, 如果返回`false`, 則執行緒沒有正在執行或沒有處於active, 反之, 處於; 結合[getActiveCount方法原始碼](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_6_7)理解; 實現了**Runnable**介面, 是一個執行緒可執行的任務. ```java private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ ... } ```
### 5.3.2 Worker中的屬性 | 屬性 | 意義 | | ------------------------------ | ------------------------------------------------------------ | | `final Thread thread` | 執行緒物件, worker會被提交到該執行緒 | | `Runnable firstTask` | 提交到執行緒池中的task, 可能為null, 比如方法[ThreadPoolExecutor#prestartCoreThread](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_6_5) | | `volatile long completedTasks` | 每個執行緒完成的任務數 |
### 5.3.3 Worker的構造方法 首先設定初始狀態`state`為-1, 這裡的`setState`方法是`AQS`中的方法; 提交的task賦值給`firstTask`屬性; 利用`ThreadFactory`, 傳入當前Worker物件(**為了執行當前Worker中的run方法**), 建立`Thread`物件. ```java Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } ```
### 5.3.4 Worker中的run方法 Worker物件的`run`方法, 直接呼叫了`ThreadPoolExecutor`的[runWorker](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_5_3)方法. ```java public void run() { runWorker(this); } ```
### 5.3.5 Worker中的重寫AQS的方法tryAcquire, tryRelease, isHeldExclusively #### 5.3.5.1 tryAcquire方法 嘗試將`state`從0設定為1, 成功後把當前持有鎖的執行緒設定為**當前執行緒**; 形參`unused`沒有用到. ```java protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } ```
#### 5.3.5.2 tryRelease方法 直接將當前持有鎖的執行緒設定為null, 將`state`設定為1; 形參`unused`沒有用到. ```java protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } ```
#### 5.3.5.3 isHeldExclusively方法 判斷當前執行緒是否已經獲取了`Worker`的鎖; 如果`getState() == 0`, 則沒有執行緒獲取了該鎖, 可以嘗試獲取鎖, 將`state`設定為1; 如果`getState() == 1`, 已經有執行緒獲取了該鎖, 互斥, 此時無法獲取該鎖. ```java protected boolean isHeldExclusively() { return getState() != 0; } ```
### 5.3.6 lock方法 獲取鎖, 直到獲取到鎖為止(具體見`AbstractQueuedSynchronizer#acquireQueued`方法); ```java public void lock() { acquire(1); } ```
### 5.3.7 tryLock方法 `tryLock`, 嘗試獲取鎖, 獲取到返回true, 否則返回false. ```java public boolean tryLock() { return tryAcquire(1); } ```
### 5.3.8 isLocked方法 `isLocked`方法, 如果當前有執行緒持有該鎖, 則返回true, 否則返回false. ```java public boolean isLocked() { return isHeldExclusively(); } ```
### 5.3.9 interruptIfStarted方法 執行緒啟動會呼叫`unlock`方法(ThreadPoolExecutor.java第1131行), 將state設定為0; 如果執行緒已經啟動, 並且沒有被中斷, 呼叫執行緒的中斷方法. ```java void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } ```
### 5.3.10 unlock方法 底層呼叫worker的[tryRelease](https://www.cnblogs.com/theRhyme/p/13056215.html#_label3_5_2_4)方法, 設定state為0. ```java public void unlock() { release(1); } ```
## 5.4 runWorker方法原始碼: 執行緒池中執行緒被複用的關鍵 執行提交的task或死迴圈從`BlockingQueue`獲取task. ```java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 當傳入的task不為null, 或者task為null但是從BlockingQueue中獲取的task不為null while (task != null || (task = getTask()) != null) { // 執行任務之前先獲取鎖 w.lock(); // 執行緒池狀態如果為STOP, 或者當前執行緒是被中斷並且執行緒池是STOP狀態, 或者當前執行緒不是被中斷; // 則呼叫interrupt方法中斷當前執行緒 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // beforeExecute hook方法 beforeExecute(wt, task); Throwable thrown = null; try { // 真正執行提交的task的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // afterExecute hook方法 afterExecute(task, thrown); } } finally { // task賦值為null, 下次從BlockingQueue中獲取task task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } ```
## 5.5 getTask方法原始碼: 從BlockingQueue中獲取task ```java private Runnable getTask() { // BlockingQueue的poll方法是否已經超時 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果執行緒池狀態>=SHUTDOWN,並且BlockingQueue為null; // 或者執行緒池狀態>=STOP // 以上兩種情況都減少工作執行緒的數量, 返回的task為null if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 當前執行緒是否需要被淘汰 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // BlockingQueue的poll方法超時會直接返回null // BlockingQueue的take方法, 如果佇列中沒有元素, 當前執行緒會wait, 直到其他執行緒提交任務入隊喚醒當前執行緒. Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ```
## 5.6 shutdown方法原始碼: 中斷所有空閒的執行緒 ```java public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 死迴圈將執行緒池狀態設定為SHUTDOWN advanceRunState(SHUTDOWN); // 中斷所有空閒的執行緒 interruptIdleWorkers(); // hook函式, 比如ScheduledThreadPoolExecutor對該方法的重寫 onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } ```
## 5.7 shutdownNow方法原始碼: 中斷所有空閒的執行緒, 刪除並返回BlockingQueue中所有的task ```java public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 死迴圈將執行緒池狀態設定為STOP advanceRunState(STOP); // 中斷所有空閒的執行緒 interruptWorkers(); // 刪除並返回BlockingQueue中所有的task tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); // 返回BlockingQueue中所有的task return tasks; } ```
# 6. ThreadPoolExecutor一些其他的方法和屬性介紹 ## 6.1 預設的執行緒工廠Executors#defaultThreadFactory 預設的執行緒工廠的兩個重要作用就是**建立執行緒**和**初始化執行緒名字首**. 建立`DefaultThreadFactory`物件. ```java public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } ``` `DefaultThreadFactory`預設構造方法, 初始化`ThreadGroup`和創建出的**執行緒名字首**`namePrefix`. ```java static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) // 非daemon執行緒, 不會隨父執行緒的消亡而消亡 t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } ```
## 6.2 ThreadPoolExecutor預設實現的4種拒絕策略 ### 6.2.1 CallerRunsPolicy 如果執行緒池狀態不是`SHUTDOWN`, 由提交任務到執行緒池中(如呼叫`ThreadPoolExecutor#execute`方法)的執行緒執行該任務; 如果執行緒池狀態是`SHUTDOWN`, 則該任務會被直接丟棄掉, **不會再次入隊**或**被任何執行緒執行**. ```java public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } ```
### 6.2.2 AbortPolicy 在呼叫提交任務到執行緒池中(如呼叫`ThreadPoolExecutor#execute`方法)的執行緒中直接丟擲`RejectedExecutionException`異常, 當然任務也不會被執行, 提交任務的執行緒如果未捕獲異常會因此停止. ```java public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } ```
### 6.2.3 DiscardPolicy 直接丟棄掉這個任務, 不做任何事情. ```java public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } ```
### 6.2.4 DiscardOldestPolicy 執行緒池如果不是`SHUTDOWN`狀態, 丟棄最老的任務, 即`workQueue`隊頭的任務, 將當前任務`execute`提交到執行緒池; 與`CallerRunsPolicy`一樣, 如果執行緒池狀態是`SHUTDOWN`, 則該任務會被直接丟棄掉, 不會再次入隊或被任何執行緒執行. ```java public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } ```
## 6.3 addWorkerFailed方法原始碼: 移除啟動執行緒失敗的worker ```java private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 獲取mainLock鎖 mainLock.lock(); try { // 如果worker不為null, 從HashSet中移除worker if (w != null) workers.remove(w); // 迴圈執行CAS操作直到讓workerCount數量減少1 decrementWorkerCount(); // 執行tryTerminate方法 tryTerminate(); } finally { mainLock.unlock(); } } ```
## 6.4 tryTerminate方法原始碼: 嘗試更改runState, workerCount, 嘗試關閉執行緒池 ```java final void tryTerminate() { for (;;) { // 獲取ctl, runState和workerCount int c = ctl.get(); // 當前執行緒池狀態是否是RUNNING, 或者是否是TIDYING或TERMINATED狀態, 或者是否是SHUTDOWN狀態並且workQueue不為空(需要被執行緒執行), return結束方法 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // workerCount如果不為0, 隨機中斷一個空閒的執行緒, return結束方法 if (workerCount如果不為0,(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; // 獲取mainLock鎖 mainLock.lock(); try { // CAS方式設定當前執行緒池狀態為TIDYING, workerCount為0 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 執行hook方法terminated terminated(); } finally { // 設定當前執行緒池狀態為TERMINATED, workerCount為0 ctl.set(ctlOf(TERMINATED, 0)); // 喚醒呼叫了awaitTermination方法的執行緒 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // 當CAS失敗, 迴圈重試 } } ```
## 6.5 awaitTermination方法原始碼: 等待指定時間後, 執行緒池是否已經關閉 死迴圈判斷, 如果當前執行緒池狀態小於`TERMINATED`, 則`wait`對應的時間; 如果過了`wait`的時間(`nanos <= 0`), 執行緒池狀態大於等於`TERMINATED`則迴圈終止, 函式返回`true`, 否則返回`false`. ```java public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } ```
## 6.6 prestartCoreThread方法原始碼: 預啟動一個核心執行緒 如果**當前執行緒池中的核心執行緒數**小於**corePoolSize**, 則增加一個**核心執行緒**(**提交的task為null**). ```java public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } ```
## 6.7 prestartAllCoreThreads方法原始碼: 預先啟動執行緒池中的所有核心執行緒 啟動所有的核心執行緒. ```java public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } ```
## 6.8 getActiveCount方法原始碼: 獲得當前執行緒池中活躍的執行緒 獲得當前執行緒池中活躍的執行緒(即正在執行task沒有wait的執行緒, [runWorker](#5.4 runWorker方法原始碼: 執行緒池中執行緒被複用的關鍵)方法中的同步程式碼塊). ```java public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } ```
# 總結 通過介紹`ThreadPoolExecutor`的構造方法, 重要屬性, `execute`方法, 引出`Worker`類, 以及真正的執行緒處理提交到執行緒池中的`task`的原始碼和流程, 對`ThreadPoolExecutor`整體結構有了清晰的認知; 執行緒池`ThreadPoolExecutor`使用`BlockingQueue`實現執行緒間的等待-通知機制, 當然也可以自己手動實現; 複用執行緒體現在[runWorker](https://www.cnblogs.com/theRhyme/p/13056215.html#_lab2_5_3)方法中, 死迴圈+`BlockingQueue`的