併發系列(一)——執行緒池原始碼(ThreadPoolExecutor類)簡析
前言
本文主要是結合原始碼去執行緒池執行任務的過程,基於JDK 11,整個過程基本與JDK 8相同。
個人水平有限,文中若有表達有誤的,歡迎大夥留言指出,謝謝了!
一、執行緒池簡介
1.1 使用執行緒池的優點
1)通過複用已建立的執行緒,降低資源的消耗(執行緒的建立/銷燬是要消耗資源的)、提高響應速度;
2)管理執行緒的個數,執行緒的個數在初始化執行緒池的時候指定;
3)統一管理執行緒,比如停止,stop()方法;
1.2 執行緒池執行任務過程
執行緒池執行任務的過程如下圖所示,主要分為以下4步,其中引數的含義會在後面詳細講解:
1)判斷工作的執行緒是否小於核心執行緒資料(workerCountOf(c) < corePoolSize),若小於則會新建一個執行緒去執行任務,這一步僅僅的是根據執行緒個數決定;
2)若核心執行緒池滿了,就會判斷執行緒池的狀態,若是running狀態,則嘗試加入任務佇列,若加入成功後還會做一些事情,後面詳細說;
3)若任務佇列滿了,則加入失敗,此時會判斷整個執行緒池執行緒是否滿,若沒有則建立非核心執行緒執行任務;
4)若執行緒池滿了,則根據拒絕測試處理無法執行的任務;
整體過程如下圖:
二、ThreadPoolExecutor類解析
2.1 ThreadPoolExecutor的建構函式
ThreadPoolExecutor類一共提供了4個建構函式,涉及5~7個引數,下面就5個必備引數的建構函式進行說明:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
1)corePoolSize :初始化核心執行緒池中執行緒個數的大小;
2)maxmumPoolSize:執行緒池中執行緒大小;
3)keepAliveTime:非核心執行緒的超時時長;
非核心執行緒空閒時常大於該值就會被終止。
4)unit :keepAliveTime的單位,型別可以參見TimeUnit類;
5)BlockingQueue workQueue:阻塞佇列,維護等待執行的任務;
2.2 私有類Worker
在ThreadPoolExecutor類中有兩個集合型別比較重要,一個是用於放置等待任務的workQueue,其型別是阻塞對列;一個是用於用於存放工作執行緒的works,其是Set型別,其中存放的型別是Worker。
進一步簡化執行緒池執行過程,可以理解為works中的工作執行緒不停的去阻塞對列中取任務,執行結束,執行緒重新加入大works中。
為此,有必要簡單瞭解一下Work型別的組成。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ //工作執行緒,由執行緒的工廠類初始化 final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; //不可重入的鎖 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } ....... }
Worker類繼承於佇列同步器(AbstractQueueSynchronizer),佇列同步器是採取鎖或其他同步元件的基礎框架,其主要結構是自旋獲取鎖的同步佇列和等待喚醒的等待佇列,其方法因此可以分為兩類:對state改變的方法 和 入、出佇列的方法,即獲取獲取鎖的資格的變化(可能描述的不準確)。關於佇列同步器後續部落格會詳細分析,此處不展開討論。
Work類中通過CAS設定狀態失敗後直接返回false,而不是判斷當前執行緒是否已獲取鎖來實現不可重入的鎖,原始碼註釋中解釋這樣做的原因是因為避免work tash重新獲取到控制執行緒池全域性的方法,如setCorePoolSize。
2.3 拒絕策略類
ThreadPoolExecutor的拒絕策略類是以私有類的方式實現的,有四種策略:
1)AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常(預設拒絕處理策略)。
2)DiscardPolicy:拋棄新來的任務,但是不丟擲異常。
3)DiscardOldestPolicy:拋棄等待佇列頭部(最舊的)的任務,然後重新嘗試執行程式(失敗則會重複此過程)。
4)CallerRunsPolicy:由呼叫執行緒處理該任務。
其程式碼相對簡單,可以參考原始碼。
三、任務執行過程分析
3.1 execute(Runnable)方法
execute(Runnable)方法的整體過程如上文1.2所述,其實現方式如下:
public void execute(Runnable command) { //執行的任務為空,直接丟擲異常 if (command == null) throw new NullPointerException(); //ctl是ThreadPoolExecutor中很關鍵的一個AtomicInteger,主執行緒池的控制狀態 int c = ctl.get(); //1、判斷是否小於核心執行緒池的大小,若是則直接嘗試新建一個work執行緒 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2、大於核心執行緒池的大小或新建work失敗(如建立thread失敗),會先判斷執行緒池是否是running狀態,若是則加入阻塞對列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //重新驗證執行緒池是否為running,若否,則嘗試從對列中刪除,成功後執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); //若執行緒池的狀態為shutdown則,嘗試去執行完阻塞對列中的任務 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3、新建非核心執行緒去執行任務,若失敗,則採取拒絕策略 else if (!addWorker(command, false)) reject(command); }
3.2 addWorker(Runnable,boole)方法
execute(Runnable)方法中,新建(非)核心執行緒執行任務主要是通過addWorker方法實現的,其執行過程如下:
private boolean addWorker(Runnable firstTask, boolean core) { //此處反覆檢查執行緒池的狀態以及工作執行緒是否超過給定的值 retry: for (int c = ctl.get();;) { // Check if queue empty only if necessary. if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) return false; for (;;) { //核心和非核心執行緒的區別 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //通過工廠方法初始化,可能失敗,即可能為null final Thread t = w.thread; if (t != null) { //獲取全域性鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); //執行緒池處於running狀態 //或shutdown狀態但無需要執行的task,個人理解為用於去阻塞佇列中取任務執行 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //執行任務,這裡會執行thread的firstTask獲取阻塞對列中取任務 t.start(); workerStarted = true; } } } finally { if (! workerStarted) //開始失敗,則會從workers中刪除新建的work,work數量減1,嘗試關閉執行緒池,這些過程會獲取全域性鎖 addWorkerFailed(w); } return workerStarted; }
3.3 runWorker(this) 方法
在3.2 中當新建的worker執行緒加入在workers中成功後,就會啟動對應任務,其呼叫的是Worker類中的run()方法,即呼叫runWorker(this)方法,其過程如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //while()迴圈中,前者是新建執行緒執行firstTask,對應執行緒個數小於核心執行緒和阻塞佇列滿的情況, //getTask()則是從阻塞對列中取任務執行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //僅執行緒池狀態為stop時,執行緒響應中斷,這裡也就解釋了呼叫shutdown時,正在工作的執行緒會繼續工作 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { //執行任務 task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; //完成的個數+1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //處理後續工作 processWorkerExit(w, completedAbruptly); } }
3.4 processWorkerExit(Worker,boole)方法
當任務執行結果後,在滿足一定條件下會新增一個worker執行緒,程式碼如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; //對工作執行緒的增減需要加全域性鎖 workers.remove(w); } finally { mainLock.unlock(); } //嘗試終止執行緒池 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { //執行緒不是中斷,會維持最小的個數 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //執行完任務後,執行緒重新加入workers中 addWorker(null, false); } }
至此,執行緒池執行任務的過程分析結束,其他方法的實現過程可以參考原始碼。
Ref:
[1]http://concurrent.redspider.group/article/03/12.html
[2]《Java併發程式設計的藝術》
&n