ThreadPoolExecutor執行緒池原始碼解析
關鍵構造屬性:
volatile int runState; 保證了多執行緒的共享可見性
static final int RUNNING = 0;
static final int SHUTDOWN = 1; 運行了SHUTDOWN
static final int STOP = 2; 運行了SHUTDOWNNOW
static final int TERMINATED = 3; 執行緒池銷燬最終狀態
private final BlockingQueue<Runnable> workQueue; //儲存工作任務佇列
private final ReentrantLock mainLock = new ReentrantLock();//在新增加執行緒,和銷燬執行緒時保證原子性和共享性
private final HashSet<Worker> workers = new HashSet<Worker>();//儲存建立的執行緒
實現原理:在新提交一個任務時,如果執行緒池執行緒數量<corePoolSize(核心執行緒數)時,新建立執行緒並把該任務賦予執行緒並啟動;如果執行緒池執行緒數量>corePoolSize(核心執行緒數)時,把任務非阻塞新增到佇列,如果佇列已經滿了並且執行緒數量<maximumPoolSize時,進行新建立執行緒並執行。如果執行緒數量已經>maximumPoolSize時,則根據傳遞給阻止策略。
在呼叫shutdown時,把執行緒池的狀態變更為SHUTDOWN ,同時把非工作中執行緒進行中斷(通過輪詢workers, tryLock取得work的鎖進行設定中斷,工作的執行緒無法獲得鎖)。不在接受新任務(因為執行緒池狀態為SHUTDOWN ,接受任何後會傳送給拒絕策略),接受新任務會傳遞給阻止策略;把佇列中的任務執行完畢,如果佇列任務執行完畢,輪詢Workers把空閒執行緒設定中斷,由於並移除workers(由於取不到任務退出while迴圈進入到workerDone),如果池中執行緒為0,會嘗試把執行緒池變更為TERMINATED 狀態。
在呼叫shutdownnow時,把執行緒池的狀態變更為STOP ,同時把所有池中執行緒輪詢進行設定中斷。不在接受新任務,不在執行佇列中的任務,把執行中任務設定執行緒中斷標誌,消亡執行緒,如果池中執行緒為0,會嘗試把執行緒池變更為TERMINATED 狀態,把workQueue工作佇列中任務移除到Arraylist集合並返回。
小提醒:
thread的interrupt方法僅僅是執行緒的中斷標示,某些阻塞方法如thread.sleep join wait及某些IO操作會響應中斷並丟擲中斷異常。如果執行的任務有異常導致執行緒死亡,通過這裡會把死亡執行緒移除。finally裡的workerDone(this),確保了執行緒池溢位執行緒進行清理掉。
一、構造:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;//核心執行緒數
this.maximumPoolSize = maximumPoolSize;//最大執行緒數
this.workQueue = workQueue;//工作佇列
this.keepAliveTime = unit.toNanos(keepAliveTime);//非核心執行緒空閒存活時間
this.threadFactory = threadFactory;//執行緒建立工廠
this.handler = handler;//執行緒拒絕策略
}
二、執行緒池執行任務
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//如果目前執行緒數大於核心執行緒數執行下面的新增到工作佇列;否則為目前執行緒數小於核心執行緒數,建立核心執行緒並執行。
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)//因為上面的新增到佇列為無鎖狀態新增,不能確保一致性,所有新增後再次檢查執行緒池是否Running狀態,如果不在把當前任務應用於拒絕策略。
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))//如果佇列已經滿,繼續建立執行緒並完成任務。
reject(command); // is shutdown or saturated
}
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//獲得鎖
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);//新增執行緒並啟動任務
} finally {
mainLock.unlock();
}
return t != null;
}
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);//形成工作任務者
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
w.thread = t;
workers.add(w);//儲存所有工作任務者
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
t.start();//啟動工作任務者,完成任務工作
workerStarted = true;
}
finally {
if (!workerStarted)
workers.remove(w);
}
}
return t;
}
----------------------------------------------------------------------------
private final class Worker implements Runnable {
public void run() {//工作者被Start啟動時執行該方法
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {//如果新建立執行緒firstTask有任務進行下面的執行,第一個任務執行完了,通過getTask()從佇列取任務再次執行
runTask(task);//把任務的run方法重新包裝了一下,通過模板方式在任務執行前和執行後添加了附加操作
task = null;
}
} finally {//放到finally裡的好處是,無論何種情況沒有取到任務脫離while迴圈,都會把脫離迴圈的執行緒從執行緒池移除。如果執行的任務有異常導致執行緒死亡,通過這裡會把死亡執行緒移除。
workerDone(this);//如果從隊列了沒有取到任務,把執行緒從執行緒池裡移除。
}
}
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();//獲得本執行緒的鎖,表示正在執行任務
try {
/*
* If pool is stopping ensure thread is interrupted;
* if not, ensure thread is not interrupted. This requires
* a double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
//下面這裡說明如果現在運行了shutdownnow,檢測到interrupted()為true只是把執行緒的中斷標誌設定,並不中斷執行緒繼續執行。
if ((runState >= STOP ||
(Thread.interrupted() && runState >= STOP)) &&
hasRun)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
beforeExecute(thread, task);//執行前置方法
try {
task.run();//執行任務的實際方法
ran = true;
afterExecute(task, null);//執行後置方法
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)//如果現在已經運行了shundownnow現在不能再從佇列取任務了
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();//非阻塞獲取
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);//阻塞一定時間內獲取
else
r = workQueue.take();//阻塞獲取
if (r != null)
return r;
if (workerCanExit()) {//判斷是否要中斷執行緒
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();//中斷目前非正在工作中的執行緒
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {//如果可以獲取到執行緒鎖,說明目前執行緒沒有正在工作中
try {
if (hasRun && thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);//刪除執行緒池裡的執行緒
if (--poolSize == 0)//如果執行緒池中執行緒已經為0,會改變執行緒池狀態為Terminate
tryTerminate();
} finally {
mainLock.unlock();
}
}
-------------------------------
1、shutdown()
問:shutdown()有什麼功能?
答:阻止新來的任務提交,對已經提交了的任務不會產生任何影響。它會將那些閒置的執行緒(idleWorks)進行中斷,當已經提交的任務執行完後,進行把執行緒中斷設定執行緒池TERMINATED 狀態。
問:如何阻止新來的任務提交?
答:通過將執行緒池的狀態改成SHUTDOWN,當再將執行execute提交任務時,如果測試到狀態不為RUNNING,則傳遞給阻止策略,從而達到阻止新任務提交的目的。
問:為何對提交的任務不產生任何影響?
答:在呼叫中斷任務的方法時,它會檢測workers中的任務嘗試獲得work的鎖trylock才進行中斷,如果work正在工作無法獲得鎖無法進行中斷,活動的work會一直把佇列中的任務執行完畢後,才設定中斷。
2、shutdownNow()
問:shutdownNow()有什麼功能?
答:阻止新來的任務提交,同時會中斷當前正在執行的執行緒,即workers中的執行緒。另外它還將workQueue中的任務給移除,並將這些任務新增到列表中進行返回。
問:如何阻止新來的任務提交?
答:通過將執行緒池的狀態改成STOP,當再將執行execute提交任務時,如果測試到狀態不為RUNNING,則傳遞給阻止策略,從而達到阻止新任務提交的目的.
問:如果我提交的任務程式碼塊中,正在等待某個資源,而這個資源沒到,但此時執行shutdownNow(),會出現什麼情況?
答:當執行shutdownNow()方法時,如遇已經啟用的任務,並且處於阻塞狀態時, 如果阻塞不接受中斷會一直阻塞直到獲得資源完成;如果當前阻塞接受中斷會丟擲InterruptedException。, shutdownNow()會執行1次中斷阻塞的操作,此時對應的執行緒報InterruptedException,如果後續還要等待某個資源,則按正常邏輯等待某個資源的到達。例如,一個執行緒正在sleep狀態中,此時執行shutdownNow(),它向該執行緒發起interrupt()請求,而sleep()方法遇到有interrupt()請求時,會丟擲InterruptedException(),並繼續往下執行。
3、awaitTermination(long timeout,TimeUnit unit)
簡單來說,awaitTermination會一直等待,直到執行緒池狀態為TERMINATED或者等待的時間到達了指定的時間。如果此執行程式終止,則返回 true;如果終止前超時期滿,則返回 false。
客戶端執行緒和執行緒池之間會有一個任務佇列。當程式要關閉時,你需要注意兩件事情:入隊的這些任務的情況怎麼樣了以及正在執行的這個任務執行得如 何了。令人驚訝的是很多開發人員並沒能正確地或者有意識地去關閉執行緒池。正確的方法有兩種:一個是讓所有的入隊任務都執行完畢(shutdown()), 再就是捨棄這些任務(shutdownNow())——這完全取決於你。比如說如果我們提交了N多工並且希望等它們都執行完後才返回的話,那麼就使用 shutdown():
private void sendAllEmails(List<String> emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
log.debug("All e-mails were sent so far? {}", done);
}
本例中我們傳送了許多電子郵件,每一封郵件都對應著執行緒池中的一個任務。提交完這些任務後我們會關閉執行緒池,這樣就不會再有新的任務進來了。然 後我們會至少等待一分鐘,直到這些任務執行完。如果1分鐘後還是有的任務沒執行到的話,awaitTermination()便會返回false。但是剩 下的任務還會繼續執行。
參考:https://www.cnblogs.com/langtianya/p/4520373.html