java多線程系列:ThreadPoolExecutor源碼分析
前言
這篇主要講述ThreadPoolExecutor的源碼分析,貫穿類的創建、任務的添加到線程池的關閉整個流程,讓你知其然所以然。希望你可以通過本篇博文知道ThreadPoolExecutor是怎麽添加任務、執行任務的,以及延伸的知識點。那麽先來看看ThreadPoolExecutor的繼承關系吧。
繼承關系
Executor接口
public interface Executor {
void execute(Runnable command);
}
Executor接口只有一個方法execute,傳入線程任務參數
ExecutorService接口
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
ExecutorService接口繼承Executor接口,並增加了submit、shutdown、invokeAll等等一系列方法。
AbstractExecutorService抽象類
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {...} public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {... } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {...} public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {...} public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {...} }
AbstractExecutorService抽象類實現ExecutorService接口,並且提供了一些方法的默認實現,例如submit方法、invokeAny方法、invokeAll方法。
像execute方法、線程池的關閉方法(shutdown、shutdownNow等等)就沒有提供默認的實現。
ThreadPoolExecutor
先介紹下ThreadPoolExecutor線程池的狀態吧
線程池狀態
int 是4個字節,也就是32位(註:一個字節等於8位
)
//記錄線程池狀態和線程數量(總共32位,前三位表示線程池狀態,後29位表示線程數量) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程數量統計位數29 Integer.SIZE=32 private static final int COUNT_BITS = Integer.SIZE - 3; //容量 000 11111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //運行中 111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; //關閉 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //停止 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; //整理 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; //終止 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; //獲取運行狀態(獲取前3位) private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取線程個數(獲取後29位) private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
- RUNNING:接受新任務並且處理阻塞隊列裏的任務
- SHUTDOWN:拒絕新任務但是處理阻塞隊列裏的任務
- STOP:拒絕新任務並且拋棄阻塞隊列裏的任務同時會中斷正在處理的任務
- TIDYING:所有任務都執行完(包含阻塞隊列裏面任務)當前線程池活動線程為0,將要調用terminated方法
- TERMINATED:終止狀態。terminated方法調用完成以後的狀態
線程池狀態轉換
RUNNING -> SHUTDOWN
顯式調用shutdown()方法, 或者隱式調用了finalize()方法
(RUNNING or SHUTDOWN) -> STOP
顯式調用shutdownNow()方法
SHUTDOWN -> TIDYING
當線程池和任務隊列都為空的時候
STOP -> TIDYING
當線程池為空的時候
TIDYING -> TERMINATED
當 terminated() hook 方法執行完成時候
構造函數
有四個構造函數,其他三個都是調用下面代碼中的這個構造函數
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
參數介紹
參數 | 類型 | 含義 |
---|---|---|
corePoolSize | int | 核心線程數 |
maximumPoolSize | int | 最大線程數 |
keepAliveTime | long | 存活時間 |
unit | TimeUnit | 時間單位 |
workQueue | BlockingQueue | 存放線程的隊列 |
threadFactory | ThreadFactory | 創建線程的工廠 |
handler | RejectedExecutionHandler | 多余的的線程處理器(拒絕策略) |
提交任務
submit
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
流程步驟如下
- 調用submit方法,傳入Runnable或者Callable對象
- 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
- 將傳入的對象轉換為RunnableFuture對象
- 執行execute方法,傳入RunnableFuture對象
- 返回RunnableFuture對象
流程圖如下
execute
public void execute(Runnable command) {
//傳進來的線程為null,則拋出空指針異常
if (command == null)
throw new NullPointerException();
//獲取當前線程池的狀態+線程個數變量
int c = ctl.get();
/**
* 3個步驟
*/
//1.判斷當前線程池線程個數是否小於corePoolSize,小於則調用addWorker方法創建新線程運行,且傳進來的Runnable當做第一個任務執行。
//如果調用addWorker方法返回false,則直接返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.如果線程池處於RUNNING狀態,則添加任務到阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
//二次檢查
int recheck = ctl.get();
//如果當前線程池狀態不是RUNNING則從隊列刪除任務,並執行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
//否者如果當前線程池線程空,則添加一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.新增線程,新增失敗則執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
其實從上面代碼註釋中可以看出就三個判斷,
- 核心線程數是否已滿
- 隊列是否已滿
- 線程池是否已滿
然後根據這三個條件進行不同的操作,下圖是Java並發編程的藝術書中的線程池的主要處理流程,或許會比較容易理解些
下面是整個流程的詳細步驟
- 調用execute方法,傳入Runable對象
- 判斷傳入的對象是否為null,為null則拋出異常,不為null繼續流程
- 獲取當前線程池的狀態和線程個數變量
- 判斷當前線程數是否小於核心線程數,是走流程5,否則走流程6
- 添加線程數,添加成功則結束,失敗則重新獲取當前線程池的狀態和線程個數變量,
- 判斷線程池是否處於RUNNING狀態,是則添加任務到阻塞隊列,否則走流程10,添加任務成功則繼續流程7
- 重新獲取當前線程池的狀態和線程個數變量
- 重新檢查線程池狀態,不是運行狀態則移除之前添加的任務,有一個false走流程9,都為true則走流程11
- 檢查線程池線程數量是否為0,否則結束流程,是調用addWorker(null, false),然後結束
- 調用!addWorker(command, false),為true走流程11,false則結束
- 調用拒絕策略reject(command),結束
可能看上面會有點繞,不清楚的可以看下面的流程圖
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查當前線程池狀態是否是SHUTDOWN、STOP、TIDYING或者TERMINATED
// 且!(當前狀態為SHUTDOWN、且傳入的任務為null,且隊列不為null)
// 條件都成立則返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//循環
for (;;) {
int wc = workerCountOf(c);
//如果當前的線程數量超過最大容量或者大於(根據傳入的core決定是核心線程數還是最大線程數)核心線程數 || 最大線程數,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加c,成功則跳出retry
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失敗執行下面方法,查看當前線程數是否變化,變化則繼續retry循環,沒變化則繼續內部循環
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
//CAS成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一個線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新檢查線程池狀態
//避免ThreadFactory退出故障或者在鎖獲取前線程池被關閉
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 先檢查線程是否是可啟動的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//判斷worker是否添加成功,成功則啟動線程,然後將workerStarted設置為true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判斷線程有沒有啟動成功,沒有則調用addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這裏可以將addWorker分為兩部分,第一部分增加線程池個數,第二部分是將任務添加到workder裏面並執行。
第一部分主要是兩個循環,外層循環主要是判斷線程池狀態,下面描述來自Java中線程池ThreadPoolExecutor原理探究
rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
展開!運算後等價於
s >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
也就是說下面幾種情況下會返回false:
- 當前線程池狀態為STOP,TIDYING,TERMINATED
- 當前線程池狀態為SHUTDOWN並且已經有了第一個任務
- 當前線程池狀態為SHUTDOWN並且任務隊列為空
內層循環作用是使用cas增加線程個數,如果線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試。
到了第二部分說明CAS成功了,也就是說線程個數加一了,但是現在任務還沒開始執行,這裏使用全局的獨占鎖來控制workers裏面添加任務,其實也可以使用並發安全的set,但是性能沒有獨占鎖好(這個從註釋中知道的)。這裏需要註意的是要在獲取鎖後重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。
所以這裏也將流程圖分為兩部分來描述
第一部分流程圖
第二部分流程圖
Worker對象
Worker是定義在ThreadPoolExecutor中的finnal類,其中繼承了AbstractQueuedSynchronizer類和實現Runnable接口,其中的run方法如下
public void run() {
runWorker(this);
}
線程啟動時調用了runWorker方法,關於類的其他方面這裏就不在敘述。
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//循環獲取任務
while (task != null || (task = getTask()) != null) {
w.lock();
// 當線程池是處於STOP狀態或者TIDYING、TERMINATED狀態時,設置當前線程處於中斷狀態
// 如果不是,當前線程就處於RUNNING或者SHUTDOWN狀態,確保當前線程不處於中斷狀態
// 重新檢查當前線程池的狀態是否大於等於STOP狀態
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//提供給繼承類使用做一些統計之類的事情,在線程運行前調用
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//提供給繼承類使用做一些統計之類的事情,在線程運行之後調用
afterExecute(task, thrown);
}
} finally {
task = null;
//統計當前worker完成了多少個任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//整個線程結束時調用,線程退出操作。統計整個線程池完成的任務個數之類的工作
processWorkerExit(w, completedAbruptly);
}
}
getTask
getTask方法的主要作用其實從方法名就可以看出來了,就是獲取任務
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//循環
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//線程線程池狀態和隊列是否為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//線程數量
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(當前線程數是否大於最大線程數或者)
//且(線程數大於1或者任務隊列為空)
//這裏有個問題(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
關閉線程池
shutdown
當調用shutdown方法時,線程池將不會再接收新的任務,然後將先前放在隊列中的任務執行完成。
下面是shutdown方法的源碼
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
立即停止所有的執行任務,並將隊列中的任務返回
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdown和shutdownNow區別
shutdown和shutdownNow這兩個方法的作用都是關閉線程池,流程大致相同,只有幾個步驟不同,如下
- 加鎖
- 檢查關閉權限
- CAS改變線程池狀態
- 設置中斷標誌(線程池不在接收任務,隊列任務會完成)/中斷當前執行的線程
- 調用onShutdown方法(給子類提供的方法)/獲取隊列中的任務
- 解鎖
- 嘗試將線程池狀態變成終止狀態TERMINATED
- 結束/返回隊列中的任務
總結
線程池可以給我們多線程編碼上提供極大便利,就好像數據庫連接池一樣,減少了線程的開銷,提供了線程的復用。而且ThreadPoolExecutor也提供了一些未實現的方法,供我們來使用,像beforeExecute、afterExecute等方法,我們可以通過這些方法來對線程進行進一步的管理和統計。
在使用線程池上好需要註意,提交的線程任務可以分為CPU 密集型任務
和IO 密集型任務
,然後根據任務的不同進行分配不同的線程數量。
- CPU密集型任務:
- 應當分配較少的線程,比如
CPU
個數相當的大小
- 應當分配較少的線程,比如
- IO 密集型任務:
- 由於線程並不是一直在運行,所以可以盡可能的多配置線程,比如 CPU 個數 * 2
- 混合型任務:
- 可以將其拆分為
CPU
密集型任務以及IO
密集型任務,這樣來分別配置。
- 可以將其拆分為
好了,這篇博文到這裏就結束了,文中可能會有些紕漏,歡迎留言指正。
如果本文對你有所幫助,給個star唄,謝謝。本文GitHub地址:點這裏點這裏
參考資料
- 並發編程網-Java中線程池ThreadPoolExecutor原理探究
- Java並發編程的藝術
java多線程系列:ThreadPoolExecutor源碼分析