ThreadPoolExecutor的應用和實現分析(中)—— 任務處理相關源碼分析
轉自:http://www.tuicool.com/articles/rmqYjq
前面一篇文章從Executors中的工廠方法入手,已經對ThreadPoolExecutor的構造和使用做了一些整理。而這篇文章,我們將接著前面的介紹, 從源碼實現上對ThreadPoolExecutor在任務的提交、執行,線程重用和線程數維護等方面做下分析。
0. ThreadPoolExecutor類的聲明屬性變量分析
public class ThreadPoolExecutor extends AbstractExecutorService
從這個類聲明中我們可以看到java.util.ThreadPoolExecutor是繼承於AbstractExecutorService的,而之前的文章我也提到過,AbstractExecutorService已經實現了一些任務提交處理的方法,如submit()方法都是在這個抽象類中實現的。但submit()方法,最後也是會調用ThreadPoolExecutor的execute()方法。
打開SunJDK中的ThreadPoolExecutor類源碼,除了上篇文章提到的一些和構造方法中參數對應的屬性之外,讓我們看看還有什麽:
- mainLock 對整個ThreadPoolExecutor對象的鎖
- workers 存儲工作線程對應Worker對象的HashSet
- termination 線程池ThreadPoolExecutor對象的生命周期終止條件,和mainLock相關
- largestPoolSize 線程池跑過的最大線程數
- completedTaskCount 完成任務數
- ctl 執行器ThreadPoolExecutor的生命周期狀態和活動狀態的worker數封裝
稍微需要說一下最後一個, ctl是一個AtomicInteger對象,以位運算的方式打包封裝了當前線程池ThreadPoolExecutor對象的狀態和活動線程數兩個數據
1. 執行器狀態
ExecutorService中已經指定了這個接口對應的類要實現的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了狀態的含義,並包含其於ctl屬性中。
ThreadPoolExecutor對象有五種狀態,如下:
- RUNNING 在ThreadPoolExecutor被實例化的時候就是這個狀態
- SHUTDOWN 通常是已經執行過shutdown()方法,不再接受新任務,等待線程池中和隊列中任務完成
- STOP 通常是已經執行過shutdownNow()方法,不接受新任務,隊列中的任務也不再執行,並嘗試終止線程池中的線程
- TIDYING 線程池為空,就會到達這個狀態,執行terminated()方法
- TERMINATED terminated()執行完畢,就會到達這個狀態,ThreadPoolExecutor終結
2. Worker內部類
它既實現了Runnable,同時也是一個AQS ( AbstractQueuedSynchronizer )。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
封裝了3樣東西,Runnable類的首個任務對象,執行的線程thread和完成的任務數(volatile)completedTasks。
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
這個類還提供了interruptIfStarted()這樣一個方法,裏面做了(getState()>= 0)的判斷。與此呼應,Worker的構造方法裏對state設置了-1,避免在線程執行前被停掉。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
3. 提交任務
上篇文章已經提到了,提交新任務的時候,如果沒達到核心線程數corePoolSize,則開辟新線程執行。如果達到核心線程數corePoolSize, 而隊列未滿,則放入隊列,否則開新線程處理任務,直到maximumPoolSize,超出則丟棄處理。
這段源碼邏輯如下,不細說了。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
4. addWorker()的實現
在上面提交任務的時候,會出現開辟新的線程來執行,這會調用addWorker()方法。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代碼較長,我們可以分兩大部分看:
第一段從第3行到第26行,是雙層無限循環,嘗試增加線程數到ctl變量,並且做一些比較判斷,如果超出線程數限定或者ThreadPoolExecutor的狀態不符合要求,則直接返回false,增加worker失敗。
第二段從第28行開始到結尾,把firstTask這個Runnable對象傳給Worker構造方法,賦值給Worker對象的task屬性。Worker對象把自身(也是一個Runnable)封裝成一個Thread對象賦予Worker對象的thread屬性。鎖住整個線程池並實際增加worker到workers的HashSet對象當中。成功增加後開始執行t.start(),就是worker的thread屬性開始運行,實際上就是運行Worker對象的run方法。Worker的run()方法實際上調用了ThreadPoolExecutor的runWorker()方法。
5. 任務的執行runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這段代碼實際上就是執行提交給線程池執行的Runnable任務的實際內容。其中,值得註意的有以下幾點:
- 線程開始執行前,需要對worker加鎖,完成一個任務後執行unlock()
- 在任務執行前後,執行beforeExecute()和afterExecute()方法
- 記錄任務執行中的異常後,繼續拋出
- 每個任務完成後,會記錄當前線程完成的任務數
- 當worker執行完一個任務的時候,包括初始任務firstTask,會調用getTask()繼續獲取任務,這個方法調用是可以阻塞的
- 線程退出,執行processWorkerExit(w, completedAbruptly)處理
5. Worker線程的復用和任務的獲取getTask()
在上一段代碼中,也就是runWorker()方法,任務的執行過程是嵌套在while循環語句塊中的。每當一個任務執行完畢,會從頭開始做下一次循環執行,實現了空閑線程的復用。而要執行的任務則是來自於getTask()方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get();
// Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask()實際上是從工作隊列(workQueue)中取提交進來的任務。這個workQueue是一個BlockingQueue,通常當隊列中沒有新任務的時候,則getTask()會阻塞。另外,還有定時阻塞這樣一段邏輯:如果從隊列中取任務是計時的,則用poll()方法,並設置等待時間為keepAlive,否則調用阻塞方法take()。當poll()超時,則獲取到的任務為null,timeOut設置為 true。這段代碼也是放在一個for(;;)循環中,前面有判斷超時的語句,如果超時,則return null。這意味著runWorker()方法的while循環結束,線程將退出,執行processWorkerExit()方法。
回頭看看是否計時是如何確定的。
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
即判斷當前線程池的線程數是否超出corePoolSize,如果超出這個值並且空閑時間多於keepAlive則當前線程退出。
另外一種情況就是allowCoreThreadTimeOut為true,就是允許核心在空閑超時的情況下停掉。
6. 線程池線程數的維護和線程的退出處理
剛剛也提到了,我們再看下processWorkerExit()方法。這個方法最主要就是從workers的Set中remove掉一個多余的線程。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
這個方法的第二個參數是判斷是否在runWorker()中正常退出了循環向下執行,如果不是,說明在執行任務的過程中出現了異常,completedAbruptly為true,線程直接退出,需要直接對活動線程數減1 。
之後,加鎖統計完成的任務數,並從workers這個集合中移除當前worker。
執行tryTerminate(),這個方法後面會詳細說,主要就是嘗試將線程池推向TERMINATED狀態。
最後比較當前線程數是不是已經低於應有的線程數,如果這個情況發生,則添加無任務的空Worker到線程池中待命。
以上,增加新的線程和剔除多余的線程的過程大概就是如此,這樣線程池能保持額定的線程數,並彈性伸縮,保證系統的資源不至於過度消耗。
ThreadPoolExecutor的應用和實現分析(中)—— 任務處理相關源碼分析