原始碼學習之執行緒池
大家面試過程中肯定被問道過執行緒池。為什麼要使用執行緒池呢?因為在系統中頻繁建立執行緒會造成很大的CPU消耗。而且用完的執行緒要等待GC回收也會造成消耗。
下面我們就來學習下最常用的執行緒池 ThreadPoolExecutor,
首先先來看看它的構造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
引數說明:
corePoolSize:核心執行緒數,執行緒池維持的正常活躍(保障不會執行緒超時)的最小執行緒數,不允許為0,除非設定允許執行緒超時等待(如果提交的任務數量大於這個引數時,提交的任務將被放入快取佇列)。
maximumPoolSize:最大執行緒數,即為執行緒池可以容納的最大執行緒數。
keepAliveTime: 空閒的執行緒存活時間。當存在超過corePoolSize或設定了允許核心執行緒空閒超時的時候,執行緒將在等待這麼長時間後自動銷燬(預設以納秒為單位)。
unit: 空閒的執行緒存活時間的單位。
workQueue: 等待佇列,注意這個佇列是阻塞的,當佇列中沒有任務時執行緒獲取任務將被阻塞,直到有任務時候被喚醒。
threadFactory: 執行緒工廠,工廠模式大家都懂的,如果有需要自定義執行緒,實現ThreadFactory介面從newThread方法返回你自定義的執行緒類就好了。
handler: 拒絕策略處理器,線上程池無法接收新的工作時候會呼叫該處理器的拒絕方法拒絕掉一些任務。
執行緒池拒絕策略處理介面:RejectedExecutionHandler
實現類:
直接拒絕策略(不會報錯):DiscardPolicy
直接拒絕策略(將丟擲一個RejectedExecutionException錯誤):AbortPolicy
讓放入的執行緒自己執行任務:CallerRunsPolicy
拋棄等待時間最長的任務:DiscardOldestPolicy
看完了這些引數後是不是還是一頭霧水?哈哈,其實我也是這樣。要了解執行緒池,先要了解它的狀態。
private static final int COUNT_BITS = Integer.SIZE - 3; //CAPACITY 值為 1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000 // (1 << 29) -1 = 0001 1111 1111 1111 1111 1111 1111 1111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //執行緒池狀態 // -1 << 29 = 1110 0000 0000 0000 0000 0000 0000 0000 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
執行緒池狀態:
RUNNING:取30位到32位111代表執行中,可以接收新的任務並且會處理等待佇列中的任務。
SHUTDOWN:取30位到32位000代表關閉中,不中斷正在執行的任務,但是會中斷等待中的執行緒。然後等待佇列中剩餘的任務處理完就關閉。
STOP: 取30位到32位001代表停止,不接受新任務也不處理等待佇列中的任務。並且會中斷正在執行的任務。
TIDYING: 清取30位到32位010代表理中,最後的掃尾工作,關閉中到結束的中間狀態,所有任務已經完成,工作執行緒數workerCount為0,執行緒池將執行結束方法。
TERMINATED: 取30位到32位011代表結束,結束方法完成,執行緒池結束。
執行緒池狀態轉換:
RUNNING -> SHUTDOWN: 呼叫了shutdown()方法之後,執行緒池會開始拒絕接收新的任務,從執行中轉變為關閉中的狀態。一般是線上程池完成工作需要銷燬時呼叫。
RUNNING -> STOP: 呼叫了shutdownNow()方法之後,執行緒池會拒絕接受任務並中斷所以正在執行的任務,從執行狀態轉入停止狀態。
SHUTDOWN -> STOP: 呼叫了shutdownNow()方法之後,執行緒池會拒絕接受任務並中斷所以正在執行的任務,從關閉中轉為停止狀態。
SHUTDOWN -> TIDYING: 執行緒池本身處於停止中的狀態,佇列和執行緒池都為空的時候。執行緒進入清理狀態。
STOP -> TIDYING: 當執行緒池為空時,執行緒池從停止中狀態轉入清理狀態。
TIDYING -> TERMINATED: 當執行緒清理完成會回撥 terminated() 方法,完成後執行緒正式結束。
現在不明白這些狀態是幹什麼用的,可以先放到一邊,接下來我們要看一下執行緒池的幾個比較核心的方法。為了讓程式碼更容易理解,我在上面加入了註釋。
// 獲取方法狀態, 取計數器的30-32位作為執行緒池的狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } //取計數器的後29位作為執行緒數 private static int workerCountOf(int c) { return c & CAPACITY; } //執行緒池執行方法 public void execute(Runnable command) { if (command == null){ throw new NullPointerException(); } //獲取計數器的數值 int c = ctl.get(); /* * 如果執行的執行緒數少於corePoolSize核心執行緒數,就啟動一個新的執行緒來執行該任務。 * addWorker方法將以原子方式檢查runState(執行狀態)和workerCount(工作中匯流排程數), * 如果執行狀態為關閉或者工作中匯流排程數等於核心執行緒數返回false。 */ //workerCountOf 取計數器的後29位作為執行緒數 和核心執行緒數進行對比 if (workerCountOf(c) < corePoolSize) { //啟動一個新的執行緒來完成工作 if (addWorker(command, true)){ return; } //獲取計數器的最新數值 c = ctl.get(); } /* * isRunning 方法是通過原子的計數器獲取的數值來判斷執行狀態(注意如果是執行狀態前三位一定是111整個數值是個負數), * 如果執行狀態是執行狀態,就嘗試向等待佇列末尾插入一個任務。 */ if (isRunning(c) && workQueue.offer(command)) { //獲取計數器的最新數值 int recheck = ctl.get(); //再次判斷執行緒池狀態,如果這時候執行緒池進入其他狀態,折刪除剛剛新增的任務,並且在remove中呼叫了中斷空閒執行緒的方法 if (!isRunning(recheck) && remove(command)){ //呼叫拒絕策略 reject(command); } /* * 如果執行緒池還處於執行中 或 者執行緒處於關閉中,但是等待佇列任務刪除失敗。 * 判斷工作執行緒的數量為0的話,就建立一個空閒執行緒放入池中,讓他將佇列中的任務執行完後 * 執行緒池在轉入清理狀態 */ else if (workerCountOf(recheck) == 0){ addWorker(null, false); } } /* * 如果執行狀態為關閉再次嘗試呼叫 addWorker 方法執行這個任務,這時addWorker * 會返回false。這時候執行拒絕策略(reject 方法底層其實呼叫的拒絕策略的拒絕方法)。 * 如果執行緒狀態為執行中,但是因為等待佇列滿了,插入失敗,再次呼叫addWorker 嘗試執行 * 任務,並且該方法引數 core為false若執行緒數未超過 最大執行緒數會建立一個新的執行緒來執行任務 * addWorker 方法下面有介紹 */ else if (!addWorker(command, false)){ reject(command); } } //將任務加入工作中的方法,執行緒池的核心方法之一 //引數 core 代表是不是比較核心執行緒數的意思。 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取計數器的數值 int c = ctl.get(); //取30位-32位判斷執行緒狀態 int rs = runStateOf(c); /* * 如果執行緒池處於 停止,清理中,結束 等狀態,直接返回false拒絕該任務 * 如果執行緒處於 關閉中 且傳入的任務不為空 也直接拒絕該任務。 * 如果傳入的任務為空,但是等待佇列為空,證明沒任務了不要建立新的執行緒,直接返回結束該方法。 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())){ return false; } for (;;) { //獲取當前執行緒數 int wc = workerCountOf(c); //判斷執行緒數是否超過極限容量, 如果沒超過 且core為true 在看看是否超過核心執行緒數 // 如果 且core為false 則和最大執行緒數進行比較。 如果超過了就返回false拒絕該任務。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){ return false; } //修改原子計數器的執行緒數 如果修改成功 就沒必要在無限迴圈了 直接斷開迴圈跳到最外層不在執行迴圈 if (compareAndIncrementWorkerCount(c)){ break retry; } //如果上面的修改沒有成功 再次獲取原子計數器數值 c = ctl.get(); // Re-read ctl //判斷當前的狀態十分和之前的一樣如果一樣繼續迴圈修改 如果不一樣則結束迴圈跳到最外層大迴圈,從新開始 if (runStateOf(c) != rs){ continue retry; } // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //獲取主鎖,為了保證在併發下新增任務的安全性 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //獲取到鎖後 從新獲取執行緒池執行狀態 int rs = runStateOf(ctl.get()); //如果執行緒處於執行狀態 或者 者執行緒處於關閉中,但是等待佇列任務刪除失敗。且執行緒池中執行緒已全被中斷 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //判斷執行緒是否是活躍的 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //將新建的工作執行緒加入工作佇列 workers.add(w); //獲取工作執行緒數量 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //設定工作執行緒加入成功 workerAdded = true; } } finally { mainLock.unlock(); } //如果工作執行緒加入成功啟動新的執行緒 並將執行緒狀態設定為 true if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
下面順便提一句,為什麼阿里規範不讓用 Executors.newSingleThreadExecutor(); 這樣的方法建立執行緒池。
我們可以看看他的原始碼:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
LinkedBlockingQueue 這是一個阻塞無限佇列,也就是說這個佇列只要不是OOM了就可以一直往裡面放。這樣會造成如果執行緒數到達核心執行緒數以後還是處理不過來並不會繼續建立執行緒,而是會一直往佇列中塞任務,直到記憶體溢位