超詳細的java執行緒池原始碼解讀
繼承關係:
ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor
執行緒池的核心方法是execute(Runnable command) 和submit(Runnable task) 而submit方法也是呼叫execute(Runnable command)完成,所以重點來看execute(Runnable command)的原始碼
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //如果當前執行緒數小於核心執行緒數大小執行addWorker()方法,增加一個執行緒執行 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //成功執行addWorker()就返回 if (addWorker(command, true)) return; //沒有成功執行獲取最新的當前執行緒數 c = ctl.get(); } //如果是執行狀態,並且加入等待佇列成功執行if塊(額外含義:執行緒池是執行狀態已經達到核心執行緒數,優先放入佇列) if (isRunning(c) && workQueue.offer(command)) {//1 //先獲取最新的執行緒數 int recheck = ctl.get(); //再次判斷如果執行緒池不是執行態了並且移除本次提交任務成功,執行拒絕操作 if (! isRunning(recheck) && remove(command)) reject(command); //如果是執行狀態,或者執行緒不是執行態但是移除任務佇列失敗, //則檢查是否有工作執行緒在消費佇列,如果有則什麼都不做(可以確保剛提交進佇列的任務被完成), //如果沒有需要建立一個消費執行緒用來消費剛剛提交的任務 else if (workerCountOf(recheck) == 0) addWorker(null, false);//2 } //如果不是執行態或者加入佇列失敗那麼嘗試執行提交過來的任務,如果執行失敗,走拒絕操作(額外含義:核心執行緒數滿了,佇列也滿了,嘗試建立新的執行緒消費,新執行緒數要小於最大執行緒數) else if (!addWorker(command, false)) reject(command); }
總結概述:
來了新任務,如果工作執行緒數還沒有達到執行緒池的核心執行緒數嘗試建立新的執行緒執行(addWork方法裡)。
如果已經達到核心執行緒數或者開啟新執行緒失敗,檢查執行緒池是否為執行態,是的話加入等待佇列。
如果執行緒池是已經不再執行態或者加入等待佇列失敗,嘗試開啟一個執行緒執行剛提交的任務,開執行緒失敗執行拒絕流程。
如果是執行態並且也加入到等待佇列成功,檢查執行緒池是否還是執行(可能被其他執行緒停止),如果不是執行態,執行移除操作,然後執行拒絕策略,
如果是執行態或者不是執行態但移除任務失敗檢查還有沒有執行緒在消費任務,沒有的話嘗試建立一個消費執行緒消費剛提交到等待佇列裡的任務
消費任務的重要方法是addWorker(Runnable firstTask, boolean core);
其有四種組合:
一、addWorker(Runnable,true)小於核心執行緒數使用
二、addWorker(Runnable,false)大於核心執行緒數,並且等待佇列也滿了情況使用
三、addWorker(null,true)沒有任務建立一個執行緒等待任務到來使用(小於核心執行緒數的情況)
四、addWorker(null,false)沒有任務建立一個執行緒等待任務到來使用(小於最大執行緒數的情況)
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 執行緒池狀態RUNNING= -1;SHUTDOWN=0;STOP=1;TIDYING=2;TERMINATED=3 // 如果執行緒池狀態是shutdown及以後的任意一種狀態,說明呼叫了關閉執行緒池的方法, //並且不符合[rs等於shutdown,並且傳進來的任務是空,並且工作佇列不等於空], //這個判斷條件是為了處理上個方法程式碼2處的情況, //即執行緒池已經不是執行態(僅僅呼叫了shutdown方法),並且彈出佇列失敗, //這種情況需要保證提交上來的任務得到執行,因此傳過來一個null的任務, //目的是為了讓執行緒池啟動一個執行緒執行剛提交的任務, //(隱含shutdown狀態新增到佇列中的任務(移除失敗的)還是會被執行), //如果已經不只是SHUTDOWN證明掉用過shutdownnow方法,直接返回false, //或者僅呼叫shutdown後又來的新任務也返回false拒絕執行, //或者是剛新增到佇列的任務已經被其他執行緒消費過了,也返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //檢查工作執行緒數,如果大於執行緒池最大上限CAPACITY(即使用int低29位可以容納的最大值) //或者跟邊界值比較已經到達邊界值都返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //如果增加工作數成功跳出迴圈往下執行 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果增加工作執行緒數失敗(可能呼叫了shutdown方法), //如果兩次狀態不一致則跳轉到retry處重新嘗試執行 if (runStateOf(c) != rs) continue retry; // 都沒發生迴圈執行 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //把傳進來的任務包裝成worker物件 w = new Worker(firstTask); //實際上t就是worker物件,只不過有名字等相關資訊 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次檢查執行緒池狀態 int rs = runStateOf(ctl.get()); //如果是執行態直接執行,或如果是shutdown狀態但傳進來是個null,即前邊說的移除佇列失敗情況 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 檢查這個物件是否被其他執行緒執行過 throw new IllegalThreadStateException(); //加入到workers中 workers.add(w); int s = workers.size(); //如果大於曾經執行過的最大執行緒數則最大執行緒數加1 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //如果增加成功啟動新執行緒執行 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果啟動失敗從workers中移除 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
總結概述:這個方法功能是保證線上程池為執行狀態下或者雖然不是執行狀態但是強制要求把已經新增到任務佇列的執行緒執行完,執行的過程是建立一個新執行緒執行
從上方程式碼看出Worker是執行執行緒的核心,那麼看下這個內部類是怎樣的,首先它實現了Runable介面,並且繼承了AbstractQueuedSynchronizer類
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//指向提交過來的任務
this.firstTask = firstTask;
//指向自己
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
沒有太多特別的不多解釋
run方法呼叫的是runWorker()方法這個是執行的核心
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//當前執行緒
Runnable task = w.firstTask;//提交上來的任務
w.firstTask = null;
w.unlock(); // 呼叫Worker類的tryRelease()方法,將state置為0,
//而interruptIfStarted()中只有state>=0才允許呼叫中斷
boolean completedAbruptly = true;
try {
//先執行提交上來的任務,完成後迴圈從佇列中取任務執行
while (task != null || (task = getTask()) != null) {
w.lock();//加鎖保證呼叫中斷後執行的任務可以正常完成
//執行新任務前要做以下判斷
//1如果執行緒池狀態是大於等於stop(呼叫shutdownnow方法了),
//直接檢視當前執行緒符合未設定中斷位 則直接呼叫wt.interrupt()方法設定
//2如果執行緒池不是大於等於stop狀態,則呼叫Thread.interrupted()清除interrupt位,
//這時如果程池為大於stop狀態(有其他執行緒呼叫執行緒池的stopnow方法),
//再檢視當前執行緒符合未設定中斷位,如果沒設定呼叫wt.interrupt()方法設定
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
//執行緒池是執行態不會走到這
wt.interrupt();//嘗試終止正在執行的任務,這裡僅僅設定一個標誌位
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//直接呼叫run方法,在當前執行緒中執行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
總結概述:此方法特別在執行執行緒直接在當前執行緒中呼叫執行緒佇列中的run方法,而沒有新建執行緒,確保了執行緒的重複利用
執行緒執行完當前任務會迴圈讀取佇列中等待的任務,下邊看看如何取佇列中的任務
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//執行緒池狀態RUNNING= -1;SHUTDOWN=0;STOP=1;TIDYING=2;TERMINATED=3
// 如果執行緒池大於等於SHUTDOWN(呼叫過shutdown方法),
//判斷是否是stop(呼叫shutdownnow)之後的狀態或者等待佇列已經為空
//言外之意呼叫過shutdownnow將停止執行等待佇列中的任務,
//還有隻掉用過shutdown方法會保證工作佇列中的任務會被執行完
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//已經呼叫shutdown或者等待佇列中的任務已經執行完,如果呼叫shutdownnow佇列中的任務還沒執行完那就放棄執行
//減少工作執行緒數
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 工作執行緒數大於核心執行緒數或者核心執行緒超時時間為真(預設為false)
//allowCoreThreadTimeOut為true超時會關閉執行緒
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1工作執行緒數大於最大執行緒數或【超時關閉標誌位真且真的超時了】
//2 上個條件成立(言外之意工作執行緒數大於最大執行緒數或者已經查過空閒時間沒任務,
//此時可能需要關閉一個執行緒了),並且確實有執行緒在工作(有工作執行緒才需要關閉),
//或者任務佇列沒工作任務了(沒任務了對應的是超時那種情況)
//可能情況:1.wc > maximumPoolSize成立,wc > 1成立
//:大於核心執行緒數,有執行緒在執行,關閉一個執行緒
// 2.wc > maximumPoolSize成立,workQueue.isEmpty() 成立
//:大於核心執行緒數,佇列中已經沒有任務可執行,關閉一個執行緒
// 3.(timed && timedOut)成立,wc > 1 成立
//:執行緒空閒超時,有執行緒在執行,關閉一個執行緒
// 4.(timed && timedOut)成立,workQueue.isEmpty()成立
// :執行緒空閒超時,佇列中沒有可執行的任務
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//工作數量減一併返回null 返回null上層方法就會結束當前執行緒
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果上述情況不滿足則正常取任務執行
Runnable r = timed ?
//沒有任務會掛起指定時間(言外之意已經大於核心數或者有超時時間的不能永久的阻塞下去)
workQueue.take();//沒有任務會阻塞直到有任務來
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
總結概述:只要執行緒池沒有呼叫shutDown就嘗試取任務消費,已呼叫shutdown但佇列還有任務沒執行完,嘗試取執行。大於核心執行緒數或者已經超時佇列中沒任務可執行,則嘗試關閉當前執行緒。
整體總結:有新任務來到,如果沒有達到核心執行緒數,則啟動新執行緒執行,已經達到核心執行緒數嘗試放到佇列,核心執行緒數和佇列都滿但核心執行緒數沒有達到最大執行緒數再建立一個執行緒執行,如果都滿了就拒絕執行。
執行過程中要重複不斷的檢查執行緒池的狀態,如果只調用過shutDown,但執行緒池中還有等待執行的佇列則取執行完等待的任務,並拒絕新到的任務(丟擲異常),如果呼叫shutDownNow方法則放棄執行佇列中的任務,並嘗試終止正則執行的任務。
如果工作執行緒數大於核心執行緒數或者執行緒空閒時間大於設定時間,那麼嘗試終止當前執行緒。如果沒有設定超時終止則沒有任務執行時執行緒阻塞。
更多精彩內容請關注微信公眾號 IT農廠【ITFF01】