執行緒池 -- ThreadPoolExecutor
執行緒池的實現原理
- 1)如果當前執行的執行緒少於corePoolSize,則建立新執行緒來執行任務(注意,執行這一步驟
需要獲取全域性鎖)。 - 2)如果執行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。
- 3)如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(注意,執
行這一步驟需要獲取全域性鎖)。 - 4)如果建立新執行緒將使當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫
RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor採取上述步驟的總體設計思路,是為了在執行execute()方法時,儘可能地避免獲取全域性鎖(那將會是一個嚴重的可伸縮瓶頸)。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
執行緒池的使用
執行緒池的建立
可以通過ThreadPoolExecutor來建立一個執行緒池:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
1)corePoolSize(執行緒池的基本大小):
當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒。
2)runnableTaskQueue(任務佇列):
用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。
-
ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按FIFO(先進先出)原則對元素進行排序。
-
LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
-
SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
-
PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
3)maximumPoolSize(執行緒池最大數量):
執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是,如果使用了無界的任務佇列這個引數就沒什麼效果。
4)ThreadFactory(執行緒工廠):
用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給執行緒池裡的執行緒設定有意義的名字,程式碼如下。
new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
5)RejectedExecutionHandler(飽和策略):
當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。在JDK1.5中Java執行緒池框架提供了以下4種策略。
- AbortPolicy:直接丟擲異常。
- CallerRunsPolicy:只用呼叫者所線上程來執行任務。
- DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
- DiscardPolicy:不處理,丟棄掉。
向執行緒池提交任務
可以使用兩個方法向執行緒池提交任務,分別為execute()和submit()方法。
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。通過以上程式碼可知execute()方法輸入的任務是一個Runnable類的例項。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執行任務異常
} finally {
// 關閉執行緒池
executor.shutdown();
}
submit()方法用於提交需要返回值的任務。執行緒池會返回一個future型別的物件,通過這個
future物件可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前執行緒一段時間後立即返回,這時候有可能任務沒有執行完。
看下 AbstractExecutorService 的實現:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
submit方法最終都是轉換一下,呼叫execute方法。其接收 RunnableFuture 型別的引數,FutureTask 實現了 RunnableFuture 介面,通過 newTaskFor 方法將 Runnable 和 Callable 都轉換為 FutureTask。
對於Runnable,會呼叫Executors.callable方法,利用介面卡模式,將call方法的呼叫委託給run方法。
最終的FutureTask都會持有一個callable物件。上面execute方法接收的就是Runnable型別的引數,實際傳遞的就是FutureTask。
任務的執行
執行緒池狀態
執行緒池狀態由一個整型的原子變數來表示,包括工作執行緒數和執行狀態,為了用這樣一個欄位表示兩個內容,32位的整型變數前三位用於狀態標識,後29位可用來表示(2^29)-1個執行緒。
執行狀態有以下幾種:
- RUNNING: 接收新的任務並且處理佇列裡的任務
- SHUTDOWN: 不接受新的任務,但是處理佇列裡的任務
- STOP: 不接受新的任務,也不處理佇列裡的任務,並中斷正在進行的任務
- TIDYING: 所有任務都已終止,workerCount為零,轉換到狀態 TIDYING 的執行緒將執行terminate()鉤子方法
- TERMINATED: terminated() 方法已經執行
然後通過下面的兩個方法分別取出二進位制中各部分指定位數的長度,其他位置充零,用來表示各自的含義。
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
private static int workerCountOf(int c) {
return c & CAPACITY;
}
比如下面五種結果分別表示RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED:
101xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 101 00000000000000000000000000000
000xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 000 00000000000000000000000000000
001xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 001 00000000000000000000000000000
010xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 010 00000000000000000000000000000
011xxxxxxxxxxxxxxxxxxxxxxxxxxxxx -> 011 00000000000000000000000000000
前面帶符號位的3位表示狀態,後面29位表示執行緒數,經過與運算後面直接都是0了。剛好跟-1,0,1,2,3這個五個數左移29位一樣:
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
執行步驟
向執行緒池提交任務後,後面就會呼叫 execute 方法來執行任務:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 首先獲取執行緒池內現有執行緒數。如果少於核心執行緒數,
* 則將當前任務組建一個新 worker。新增 worker 成功則返回。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 如果 worker 新增失敗,則再次獲取原子變數ctl
c = ctl.get();
}
// 如果是 RUNNING 狀態,則往 work 佇列中插入任務。
if (isRunning(c) && workQueue.offer(command)) {
/*
* 如果插入成功,還要再次檢查是否 RUNNING 狀態,如果非
* RUNNING 狀態,則要移除此任務,移除成功還要執行拒絕方法。
*/
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
/*
* 是 RUNNING 狀態或者非 RUNNING 狀態但是 remove 任務失敗,
* 則檢查工作者執行緒數是否為0,是的話新增一個不執行的 worker。
* 此時沒有傳入任務,因為任務已經加入佇列中,後面將從佇列取出執行
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 執行緒池不是 RUNNING 狀態或者是 RUNNING 狀態但往佇列加入任務失敗(即佇列已滿),
* 再次呼叫addWorker方法,但第二個引數傳入為false,將執行緒池的有限執行緒
* 數量的上限設定為maximumPoolSize,如果 addWorker 失敗則執行拒絕策略
*/
else if (!addWorker(command, false))
reject(command);
}
關於 Worker
addWorker 方法包含兩個引數,第一個是新執行緒首先要執行的任務,第二個是是否使用 corePoolSize 作為邊界,如果不是則用 maximumPoolSize 作為執行緒數量的邊界值。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/*
* 如果是[RUNNING]狀態或者是[SHUTDOWN]狀態並且 firstTask 為 null
* 因為在SHUTDOWN時不會在新增新的任務,但還是會執行 workQueue中的任務
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
從 addWorker 方法中可以看出,首先會進行一系列狀態的判斷,滿足條件則建立一個 Worker,傳入 firstTask 作為首要執行的任務:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 建立的執行緒預設傳入當前 Worker, Worker 實現了 Runnable
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//......
}
Worker 的建構函式中還將 state 設定為 -1,這個 state 繼承自AQS,註釋說是為了禁止在 runWorker 方法呼叫之前中斷。而 tryTerminate() 和 shutDown() 方法都會呼叫 interruptIdleWorkers() 方法:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
這個方法會判斷當前 Worker 裡的執行緒是否已中斷,如果沒有中斷,呼叫 tryLock 方法嘗試獲取鎖,如果能獲取表示該執行緒沒有正在處理的任務,則將該執行緒中斷。而 tryLock 方法又會呼叫 tryAcquire 方法:
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
這個就是要把 state 從 0 改為 1,如果之前建立的 Worker 不設定 state 為 -1 的話,剛建立的 Worker 可能就要掛掉了。
通過執行緒工廠建立執行緒,Worker 實現了 Runnable,傳入當前 Worker,這樣執行緒 start() 的時候呼叫的就是該 Worker 的 run 方法,run 方法又呼叫了 runWorker 方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 是否遇到異常事件(遭遇突兀)
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
獲取該 Worker 的 firstTask,如果不為空則會呼叫其 run 方法,為空的話則取佇列 workQueue 中的 task 來執行:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/*
* allowCoreThreadTimeOut 預設是false,也就是核心執行緒不允許進行超時
* 如果不允許超時並且當前執行緒池中執行緒大於核心執行緒的話都要超時控制
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* a.如果當前執行緒大於最大執行緒數,或者小於最大執行緒數但是需要超時控制並且已超時,
* 然後當前執行緒數大於1或者佇列為空,
* 滿足條件a就比較並減少執行緒數量,成功的話返回null,否則繼續for迴圈。
*
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 不滿足條件a,如果需要超時控制則通過 poll() 方法從佇列中獲取任務,不需要則直接使用阻塞方法 take() 取
// 如果 keepAliveTime 時間獲取到任務則返回該任務,沒有獲取到任務則標記 timeOut 為 true
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
第一個條件判斷狀態為非 RUNNING,並且執行緒池正在終止或者任務佇列為空了,這個時候就要將 workerCount 減1並返回 null。
第二個條件判斷在工作執行緒大於最大執行緒數時,或者需要超時控制並且第一次取任務已經超時了,滿足以上兩個條件之一則判斷工作執行緒數大於1任務或者佇列為空,則將工作執行緒數減一,成功則返回 null,否則繼續迴圈。
這裡在佇列中獲取任務的同時關注執行緒池狀態和當前工作執行緒數,還有任務佇列為否為空,也就是在看究竟要不要這麼多執行緒。
結合上面的 runWorker 方法,如果這裡 getTask() 為 null 則跳出迴圈,執行processWorkerExit()方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值為true,則說明執行緒執行時出現了異常,需要將workerCount減1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從workers中移除,也就表示著從執行緒池中移除了一個工作執行緒
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
總結
-
以 execute 方法把任務送進去,這時可能建立一個新執行緒來執行任務,也可能只是把任務丟進了任務佇列,等待其他執行緒執行完後來領任務。還可能因為達到了執行緒池上限被拒絕。
-
執行緒和 Worker 繫結在一起,每個 Worker 都會建立新執行緒,Worker 的執行是一個長期的過程,它在 execute 方法中被建立,但它的生命週期卻不限於 execute 方法中。
-
addWorker 方法建立 Worker,也會觸發 Worker 中的 thread.start(), Worker 本身實現了 Runnable 並放到了自己的 Thread 構造方法中,然後 start 會呼叫 Worker 實現的 run 方法,進一步呼叫執行緒池的 runWorker 方法。
-
runWorker 方法會讓執行緒在執行完任務後迴圈使用,不斷地去任務佇列領取新任務,如果獲取不到任務了,就要處理退出了,呼叫 processWorkerExit 完成善後處理,執行緒能重複利用靠的就是迴圈獲取任務來延長其生命週期,退出迴圈其實已近標記它走線衰亡了。
執行緒池的設計還是比較複雜,要思考得比較全面,也要有一定的基礎才能消化,在學習的過程中我產生了很多疑問,最終才能勉強在腦海中構成一個閉環。還有些疑問至今未能尋得答案,或者說給自己一個滿意的解釋,後面還要不斷地探索學習。