ThreadPoolExecutor原始碼分析 -- execute、shutdown方法
來個簡單的例子
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10,100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
executor.execute(() -> {
System.out.println("hello world");
});
ThreadPoolExecutor構造方法至少需要5個引數:corePoolSize(執行緒池的核心執行緒數),maximumPoolSize(執行緒池最大執行緒數),keepAliveTime(執行緒空閒存活時間),TimeUnit(存活時間的單位),BlockingQueue(阻塞佇列,用於存放Runnable); 另外兩個引數ThreadFactory(執行緒工廠,用於建立新執行緒),RejectedExecutionHandler(拒絕策略),不傳得話就會使用預設值。
首先來看ThreadPoolExecutor的繼承關係
位於頂端的Executor介面很簡單
public interface Executor {
void execute(Runnable command);
}
ExecutorService增加了一些方法
AbstractExecutorService實現了一些方法
開始分析ThreadPoolExecutor原始碼,先來預先了解一些比較重要的成員變數、方法
//記錄是執行狀態和執行緒數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 ));
private static final int COUNT_BITS = Integer.SIZE - 3;
//執行緒允許的最大數量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//執行狀態:可以接受新任務,並執行已經加入任務佇列的任務
private static final int RUNNING = -1 << COUNT_BITS;
//關閉狀態:不接受新任務,但仍然執行已經加入任務佇列的任務
private static final int SHUTDOWN = 0 << COUNT_BITS;
//停止狀態:不接受新任務,不執行已經加入任務佇列的任務,還中斷正在執行的任務
private static final int STOP = 1 << COUNT_BITS;
//調整回收狀態:所有任務被停止,清空所有執行緒進入該狀態,然後執行terminated()方法
private static final int TIDYING = 2 << COUNT_BITS;
//終止狀態:terminated()方法執行完成後進入該狀態
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取執行緒池執行狀態(即取高三位)
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
上面可以看到,執行緒池的執行狀態值和當前執行緒數量都存在了一個int型別數字(ctl)裡,高三位存的是狀態,其他位用於記錄數量。而且,狀態是隻能按照上面從上到下的順序變化的。
一、分析執行緒池執行過程
接著從execute()方法入手
public void execute(Runnable command) {
if (command == null)//判空
throw new NullPointerException();
int c = ctl.get();//拿到執行緒池狀態
if (workerCountOf(c) < corePoolSize) {
//核心執行緒數沒達到,新增一個核心執行緒
if (addWorker(command, true))//成功就返回,否則繼續下面
return;
//要麼是當前執行緒想新增一個核心執行緒的時候,核心執行緒數已經達到了;要麼是執行緒池狀態的原因,具體看下面addWorker方法
c = ctl.get();//拿最新的狀態值
}
if (isRunning(c) && workQueue.offer(command)) {//這裡加入任務佇列
int recheck = ctl.get();
//上面加入佇列成功後進行二次檢查
if (!isRunning(recheck) && remove(command))
reject(command);//執行緒池處於不接受新任務狀態,呼叫拒絕策略
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//執行緒池的無執行的執行緒,新增一個非核心執行緒
}
else if (!addWorker(command, false))//嘗試再新增一個非核心執行緒來執行任務
reject(command);//失敗了,說明執行緒池狀態原因或者佇列滿了
}
下面看addWorker方法是如何新增執行緒的
//第二個引數表示是否是核心執行緒,返回值表示是否新增執行緒成功
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//拿到執行緒池的狀態
int rs = runStateOf(c);//拿到執行狀態
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//這裡主要是理解! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()),
//前面說過: SHUTDOWN狀態不接受新任務,但仍然執行已經加入任務佇列的任務,
//所以當進入SHUTDOWN狀態,而傳進來的任務為空,並且任務佇列不為空的時候,是允許新增新執行緒的,把這個條件取反就不允許了
for (;;) {
int wc = workerCountOf(c);//拿到執行緒數
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;//大於最大容量,或大於初始引數值返回false
if (compareAndIncrementWorkerCount(c))
break retry;//CAS新增執行緒數目成功,跳出最外層迴圈
c = ctl.get(); // 重新拿到狀態
if (runStateOf(c) != rs)
continue retry;//如果執行狀態變了,進入外層迴圈
//否則繼續在裡層迴圈嘗試CAS
}
}
//上面新增執行緒數量成功了,開始真正新增執行緒
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//下面會分析Work類
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//只有當前是正在執行狀態,或是SHUTDOWN且firstTask為空,才進入
if (t.isAlive()) //這裡說明被調了start方法
throw new IllegalThreadStateException();
workers.add(w);//新增執行緒,workers是個HashSet,所以要加鎖保證執行緒安全
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//記錄一下執行緒池的峰值
workerAdded = true;//新增執行緒成功,設定標誌位
}
} finally {
mainLock.unlock();
}
【標誌位1】
if (workerAdded) {
t.start();//新增執行緒成功,開始執行執行緒
workerStarted = true;//設定開始執行標誌位
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);//這裡執行緒新增失敗
}
return workerStarted;
}
從上面可以大致猜測出,在這個執行緒池的設計中,執行緒被封裝成Worker的形式存在。 下面分析一下Worker類
一開始看到它還實現了Runnable介面覺得有點奇怪,不知道有什麼用,接下去看就懂了。
final Thread thread;
//初始化的Runnable
Runnable firstTask;
//完成的任務數
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
//這裡新建一個執行緒,注意傳入的是this,這個很關鍵
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
可以看到該類有一個執行緒成員,而這個執行緒的runnable卻是它自身,再看看它實現的run方法裡執行了runWorker方法。回顧一下上面addWorker方法的【標誌位1】處,那裡啟動了執行緒,所以會執行run方法,所以最終會呼叫runWorker方法。
runWorker方法是屬於ThreadPoolExecutor的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;//標誌是不是使用者任務異常導致終止的
try {
//這裡通過迴圈,不斷地取任務來執行,getTask是會阻塞的
while (task != null || (task = getTask()) != null) {
w.lock();
//前面說過,stop狀態時不接受新任務,不執行已經加入任務佇列的任務,還中斷正在執行的任務
//所以對於stop狀態以上是要中斷執行緒的
//(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)確保執行緒中斷標誌位為true且是stop狀態以上,接著清除了中斷標誌
//!wt.isInterrupted()則再一次檢查保證執行緒需要設定中斷標誌位
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//回撥方法,給子類具體實現
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//回撥方法,給子類具體實現
}
} finally {
task = null;//置空,如果進入下一個迴圈可以繼續取任務
w.completedTasks++;//完成數+1
w.unlock();
}
}
completedAbruptly = false;//說明不是使用者任務異常引起的
} finally {
processWorkerExit(w, completedAbruptly);
}
}
到這裡基本就知道執行緒池如何複用執行緒,來執行任務了;也知道它怎麼控制執行緒最大數量,但還不知道如何控制在超過空閒時間時回收執行緒?答案就在getTask方法
下面的getTask方法有點重要,起到控制執行緒池執行緒數量的作用
private Runnable getTask() {
boolean timedOut = false; // 取任務是否超時
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//這個狀態判斷挺重要的,起到執行緒池關閉作用
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//執行緒數量減一
return null;//這裡返回null,意味著一個執行緒會退出
}
int wc = workerCountOf(c);
//這裡可以看出核心執行緒在空閒的時候也是可以設定被回收的
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//timed為true將要有時間限制地取任務
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//大於最大限制執行緒數或超過空閒時間,並且當前執行緒數大於1或佇列為空
if (compareAndDecrementWorkerCount(c))
return null;//說明執行緒數減一成功,返回null,意味著一個執行緒會退出
continue;//上面執行緒數減一失敗,說明執行緒數量已被搶先改變,繼續迴圈,
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;//用於下一次迴圈中
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
上面的workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)用於超時檢查取任務,超過時間就會返回null,那timedOut就會變為true,進入下一次迴圈,然後檢查是否可以減少執行緒數(timedOut就是其中一個條件),然後返回null就可以退出一個執行緒了; PS: 上面有個很重要的判斷if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())),這個用於關閉執行緒池
二、分析執行緒池關閉過程
分析完執行緒池的執行流程,下面接著分析下執行緒池如何關閉,看shutdown方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//【步驟1】對執行緒檢查一下,是否有許可權修改
advanceRunState(SHUTDOWN);//【步驟2】改變執行緒池狀態為SHUTDOWN
interruptIdleWorkers();//【步驟3】中斷所有執行緒
onShutdown(); // 【步驟4】留給子類具體實現,如ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();【步驟5】
}
上面分了五個步驟
【步驟2】:
//該方法改變執行緒池狀態為SHUTDOWN或者STOP
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
【步驟3】
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();//設定中斷標誌
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
【步驟5】
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;//狀態條件不滿足
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);//執行緒數不為0,終止一個執行緒
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();//設定TIDYING狀態成功,呼叫terminated方法,具體由子類實現
} finally {
ctl.set(ctlOf(TERMINATED, 0));//進入TERMINATED狀態,說明已經關閉
termination.signalAll();//喚醒
}
return;
}
} finally {
mainLock.unlock();
}
//接著迴圈
}
}
看到termination.signalAll()的時候,有點疑惑,查了一下原始碼中用到termination(一個ConditionObject例項)的地方
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
是個公開的方法,聯想一下平常用法,該方法用於檢測執行緒池的關閉
try{
while(!executor.awaitTermination(500, TimeUnit.MILLISECONDS)) {
//
}
}
catch (InterruptedException e) {
//中斷處理
}
從上面看出,shutdown方法改變狀態為SHUTDOWN,並在嘗試給每個執行緒設定中斷標誌,接著結合getTask()方法返回null來停止移除執行緒,最後嘗試終止執行緒池。
參考:JDK1.8原始碼