1. 程式人生 > 其它 >執行緒池ThreadPoolExecutor原始碼詳解

執行緒池ThreadPoolExecutor原始碼詳解

ThreadPoolExecutor顧名思義,是一個執行緒池管理工具類,該類主要提供了任務管理,執行緒的排程和相關的hook方法來控制執行緒池的狀態。

1.方法說明

任務管理主要方法如下:

public void execute(Runnable command);
public <T> Future<T> submit(Callable<T> task);
public <T> Future<T> submit(Runnable task, T result);
publicFuture<?>submit(Runnabletask);
public void shutdown();
public List<Runnable> shutdownNow();

上述方法中,execute()和submit()方法在有空閒執行緒存在的情況下會立即呼叫該執行緒執行任務,區別在於execute()方法是忽略任務執行結果的,而submit()方法則可以獲取結果。除此之外,ThreadPoolExecutor還提供了shutdown()和shutdownNow()方法用於關閉執行緒池,區別在於shutdown()方法在呼叫之後會將任務佇列中的任務都執行完畢之後再關閉執行緒池,而shutdownNow()方法則會直接關閉執行緒池,並且將任務佇列中的任務匯出到一個列表中返回。

除上述用於執行任務的方法外,ThreadPoolExecutor還提供瞭如下幾個hook(鉤子)方法:

protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
protected void terminated();

在ThreadPoolExecutor中這幾個方法預設都是空方法,beforeExecute()會在每次任務執行之前呼叫,afterExecute()會在每次任務結束之後呼叫,terminated()方法則會線上程池被終止時呼叫。使用這幾個方法的方式就是宣告一個子類繼承ThreadPoolExecutor,並且在子類中重寫需要定製的鉤子方法,最後在建立執行緒池時使用該子類例項即可。

2.任務排程

a.相關引數

對於ThreadPoolExecutor的例項化,其主要有如下幾個重要的引數:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler);
  • corePoolSize:執行緒池核心執行緒的數量;

  • maximumPoolSize:執行緒池可建立的最大執行緒數量;

  • keepAliveTime:當執行緒數量超過了corePoolSize指定的執行緒數,並且空閒執行緒空閒的時間達到當前引數指定的時間時該執行緒就會被銷燬,如果呼叫過allowCoreThreadTimeOut(booleanvalue)方法允許核心執行緒過期,那麼該策略針對核心執行緒也是生效的;

  • unit:指定了keepAliveTime的單位,可以為毫秒,秒,分,小時等;

  • workQueue:儲存未執行的任務的佇列;

  • threadFactory:建立執行緒的工廠,如果未指定則使用預設的執行緒工廠;

  • handler:指定了當任務佇列已滿,並且沒有可用執行緒執行任務時對新新增的任務的處理策略;

b.排程策略

當初始化一個執行緒池之後,池中是沒有任何使用者執行任務的活躍執行緒的,當新的任務到來時,根據配置的引數其主要的執行任務如下:

  • 若執行緒池中執行緒數小於corePoolSize指定的執行緒數時,每來一個任務,都會建立一個新的執行緒執行該任務,無論執行緒池中是否已有空閒的執行緒;

  • 若當前執行的任務達到了corePoolSize指定的執行緒數時,也即所有的核心執行緒都在執行任務時,此時來的新任務會儲存在workQueue指定的任務佇列中;

  • 當所有的核心執行緒都在執行任務,並且任務佇列中存滿了任務,此時若新來了任務,那麼執行緒池將會建立新執行緒執行任務;

  • 若所有的執行緒(maximumPoolSize指定的執行緒數)都在執行任務,並且任務佇列也存滿了任務時,對於新新增的任務,其都會使用handler所指定的方式對其進行處理。

c.排程策略注意點
  • 在第二步中,當前核心執行緒都在執行任務,並且任務佇列已滿時,會建立新的執行緒執行任務,這裡需要注意的是,建立新執行緒的時候當前總共需要執行的任務數是(corePoolSize+workQueueSize),並不是只有corePoolSize個任務;

  • 在第三步中,這裡workQueue主要有三種類型:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue,第一個是有界阻塞佇列,第二個是無界阻塞佇列,當然也可以為其指定界限大小,第三個是同步佇列,對於ArrayBlockingQueue,其是需要指定佇列大小的,當佇列存滿了任務執行緒池就會建立新的執行緒執行任務,對於LinkedBlockingQueue,如果其指定界限,那麼和ArrayBlockingQueue區別不大,如果其不指定界限,那麼其理論上是可以儲存無限量的任務的,實際上能夠儲存Integer.MAX_VALUE個任務(還是相當於可以儲存無限量的任務),此時由於LinkedBlockingQueue是永遠無法存滿任務的,因而maxPoolSize的設定將沒有意義,一般其會設定為和corePoolSize相同的值,對於SynchronousQueue,其內部是沒有任何結構儲存任務的,當一個任務新增到該佇列時,當前執行緒和後續新增任務的執行緒都會被阻塞,直至有一個執行緒從該佇列中取出任務,當前執行緒才會被釋放,因而如果執行緒池使用了該佇列,那麼一般corePoolSize都會設計得比較小,maxPoolSize會設計得比較大,因為該佇列比較適合大量並且執行時間較短的任務的執行;

  • 在第四步中,DiscardPolicy和DiscardOldestPolicy一般不會配合SynchronousQueue使用,因為當同步佇列阻塞了任務時,該任務都會被拋棄;對於AbortPolicy,因為如果佇列已滿,那麼其會丟擲異常,因而使用時需要小心;對於CallerRunsPolicy,由於當有新的任務到達時會使用呼叫執行緒執行當前任務,因而使用時需要考慮其對伺服器響應的影響,並且還需要注意的是,相對於其他幾個策略,該策略不會拋棄任務到達的任務,因為如果到達的任務使佇列滿了而只能使用呼叫執行緒執行任務時,說明執行緒池設計得不夠合理,如果任其發展,那麼所有的呼叫執行緒都可能會被需要執行的任務所阻塞,導致伺服器出現問題。

3.原始碼講解

a.主要屬性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 00011111 11111111 11111111 11111111

private static final int RUNNING = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS; // 00000000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS; // 00100000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000

由於ThreadPoolExecutor需要管理多種狀態,並且還要記錄當前執行任務的執行緒的數量,如果使用多個變數,併發更新時管理將會非常複雜,這裡ThreadPoolExecutor則主要使用一個AtomicInteger型別的變數ctl儲存所有主要的資訊。ctl是一個32位的整形數字,初始值為0,其最高的三位用於儲存當前執行緒池的狀態資訊,主要有RUNNING,SHUTDOWN,STOP,TIDING和TERMINATED,分別表示執行狀態,關閉狀態,終止狀態,整理狀態和結束狀態。這幾種狀態對應的具體數值資訊如上述程式碼所示,這裡需要注意的一點是,在ThreadPoolExecutor中,這幾種狀態在數值上是從小到大依次增大的,並且狀態流轉也是依次往下的,這就為其判斷狀態資訊提供了比較便利的方式,如當需要判斷執行緒池狀態是否處於SHUTDOWN狀態時,只需要判斷其代表狀態位部分的值是否等於SHUTDOWN即可。在ctl中,除了最高三位用於表示狀態外,其餘位所代表的數值則指定了當前執行緒池中正在執行任務的執行緒數。如下是操作ctl屬性的相關方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
  • runStateOf(intc):用於獲取當前執行緒池的狀態,c為當前執行緒池工作時的ctl屬性值;

  • workerCountOf(intc):用於獲取當前執行緒池正在工作的執行緒數量,c為當前執行緒池工作時的ctl屬性值;

  • ctlOf(intrs,intwc):這裡rs表示當前執行緒的工作狀態,wc則表示正在工作的執行緒數,該方法用於將這兩個引數組裝為一個ctl屬性值;

  • runStateLessThan(intc,ints):判斷當前執行緒池狀態是否未達到指定狀態,如前所述,狀態流轉在數值上是依次增大的,因而這裡只需要判斷其大小即可;

  • runStateAtLeast(intc,ints):用於判斷當前執行緒池狀態是否至少處於某種狀態;

  • isRunning(intc):用於判斷當前執行緒池是否處於正常執行狀態;

  • compareAndIncrementWorkerCount(intexpect):增加當前執行緒池的工作執行緒數量值;

  • compareAndDecrementWorkerCount(intexpect):減少當前執行緒池的工作執行緒數量值。

b.主要方法

對於執行緒池的execute()和submit()方法,其實在底層submit()方法會將傳入的任務封裝為一個FutureTask物件,由於FutureTask物件是實現了Runnable介面的,因而其也可以當做一個任務執行,這裡就是將封裝後的FutureTask物件傳遞給execute()方法執行的。我們這裡則主要講解execute()方法的實現方式,如下是execute()方法的程式碼:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get(); // 獲取當前執行緒池狀態
if (workerCountOf(c) < corePoolSize) {
// 當工作執行緒數小於核心執行緒數時,則呼叫addWorker()方法建立執行緒並執行任務
if (addWorker(command, true))
return;
c = ctl.get(); // 若新增失敗,則更新當前執行緒池狀態
}

// 執行到此處,則說明執行緒池中的工作執行緒要麼大於等於核心執行緒數,要麼當前執行緒池已經被命令關閉了(addWorker方法新增失敗的原因),因而這裡判斷執行緒池是否為RUNNING狀態,是則將任務新增到任務佇列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 新增佇列成功後雙重驗證,確保執行緒池處於正確狀態
if(!isRunning(recheck)&&remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 若執行緒池中沒有執行緒,則建立一個新執行緒執行新增的任務
} else if(!addWorker(command,false))
reject(command); // 執行緒池至少處於SHUTDOWN狀態,拒絕當前任務的執行
}

在execute()方法中,其首先判斷執行緒池工作執行緒數是否小於核心執行緒數,是則建立核心執行緒執行任務,新增失敗或者工作執行緒數大於等於核心執行緒數時,則將任務新增到任務佇列中,新增成功後會進行雙重驗證確保當前執行緒池處於正確的狀態,並且確保當前有可用的執行緒執行新新增的任務。由此可見對於execute()方法的實現,其比較核心的方法是addWorker()方法,如下是addWorker()方法的實現方式:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 獲取當前執行狀態

// 判斷當前執行緒池是否至少為SHUTDOWN狀態,並且firstTask和任務佇列中沒有任務,是則直接返回
if(rs>=SHUTDOWN&&!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 判斷是否工作執行緒數大於可記錄的最大執行緒數,或者工作執行緒超過了指定的核心執行緒或者最大執行緒數
if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))
return false;
// 走到這一步說明當前執行緒池處於RUNNING狀態,或者任務佇列存在任務,並且工作執行緒數不超過
// 指定的執行緒數量,那麼就增加工作執行緒數量,成功則繼續往下執行,失敗則重複上述新增步驟
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if(runStateOf(c)!=rs)
continue retry;
}
}

// 記錄工作執行緒數的變數已經更新,接下來建立執行緒執行任務
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 建立一個工作者物件
final Thread t = w.thread;
if(t!=null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());

// 重新檢查執行緒池狀態,或者是判斷當前是SHUTDOWN狀態,而firstTask為空,這說明任務佇列此時不為空
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w); // 將建立的工作者新增到工作者集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新已使用的最大執行緒數
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 工作者物件成功建立之後,呼叫該工作者執行任務
workerStarted = true;
}
}
} finally {
if(!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

在addWorker()方法中,其首先檢查當前執行緒池是否處於RUNNING狀態,或者處於SHUTDOWN狀態,但是任務佇列中還存在有任務,那麼其就會建立一個新的Worker物件,並且將其新增到工作者物件集合中,然後呼叫工作者物件所維護的執行緒執行任務,如下是工作者物件的實現程式碼:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread; // 當前工作者中執行任務的執行緒
Runnable firstTask; // 第一個需要執行的任務
volatile long completedTasks; // 當前工作者完成的任務數

Worker(Runnable firstTask) {
// 預設設定為-1,那麼如果不呼叫當前工作者的run()方法,那麼其狀態是不會改變的,
// 其他的執行緒也無法使用當前工作者執行任務,在run()方法呼叫的runWorker()方法中會
// 呼叫unlock()方法使當前工作者處於正常狀態
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 使用執行緒工廠建立執行緒
}

public void run() {
runWorker(this); // 使用當前工作者執行任務
}

protected boolean isHeldExclusively() {
returngetState()!=0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

// 如果當前執行緒已經在執行任務,那麼將其標記為打斷狀態,待其任務執行完畢則終止任務的執行
void interruptIfStarted() {
Thread t;
if (getState() >= 0&&(t=thread)!=null&&!t.isInterrupted()){
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

在工作者物件中,其主要維護了一個工作者執行緒,用於執行任務。該工作者物件繼承了AbstractQueuedSynchronizer,用於控制當前工作者工作狀態的獲取,並且其也實現了Runnable介面,將主要任務的執行封裝到run()方法中。如下是runWorker()方法的具體實現:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 重置Worker物件的狀態
boolean completedAbruptly = true;
try {
// 首先執行工作者執行緒中的任務,然後迴圈從任務佇列中獲取任務執行
while(task!=null||(task=getTask())!=null) {
w.lock();
// 檢查當前執行緒池的狀態,如果執行緒池被終止或者執行緒池終止並且當前執行緒已被打斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted()&&runStateAtLeast(ctl.get(),STOP)))&&!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 呼叫鉤子方法進行預處理
Throwable thrown = null;
try {
task.run(); // 執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 呼叫鉤子方法進行任務完成後的處理工作
}
} finally {
task = null; // 重置工作者的初始任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

可以看到,在runWorker()方法中,其首先會執行工作者物件的初始化任務,當執行完畢後會通過一個無限迴圈不斷在任務佇列中獲取任務執行。如下是getTask()方法的原始碼:

private Runnable getTask() {
boolean timedOut = false;

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 判斷當前執行緒是否處於STOP狀態,或者處於SHUTDOWN狀態,並且工作佇列是空的,是則不返回任務
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否允許空閒執行緒過期

// 工作執行緒數大於最大允許執行緒數,或者執行緒在指定時間內無法從工作佇列中獲取到新任務,則銷燬當前執行緒
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 允許核心執行緒過期或者工作執行緒數大於corePoolSize時,從任務佇列獲取任務時會指定等待時間,
// 否則會一直等待任務佇列中新的任務
Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();
if(r!=null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

可以看到,getTask方法首先會判斷當前執行緒池狀態是否為STOP狀態,或者是SHUTDOWN狀態,並且任務佇列是空的,是則不返回任務,否則會根據相關引數從任務佇列中獲取任務執行。

以上execute()方法的主要實現步驟,在ThreadPoolExecutor中另一個至關重要的方法則是shutdown()方法,以下是shutdown()方法的主要程式碼:

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 檢查對執行緒狀態的控制權限
advanceRunState(SHUTDOWN); // 更新當前執行緒池狀態為SHUTDOWN
interruptIdleWorkers(); // 打斷空閒的工作者
onShutdown(); // 鉤子方法,但是沒有對外公開,因為該方法只有包訪問許可權
} finally {
mainLock.unlock();
}
tryTerminate();
}

在shutdown()方法中,其首先檢查當前執行緒是否有修改執行緒狀態的許可權,然後將當前執行緒池的狀態修改為SHUTDOWN,接著呼叫interruptIdleWorkers()方法中斷所有處於空閒狀態的執行緒,最後則是呼叫tryTerminate()方法嘗試將當前執行緒池的狀態由SHUTDOWN修改為TERMINATED,這裡interruptIdleWorkers()方法最終會呼叫其過載方法interruptIdleWorkers(boolean)方法,該方法程式碼如下:

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for(Workerw:workers){
Thread t = w.thread;
if(!t.isInterrupted()&&w.tryLock()){
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

可以看到,該方法會遍歷所有的工作者物件,如果其處於空閒狀態,則將其終止。對於處於工作狀態的執行緒,由於在shutdown()方法中已經將當前執行緒池的狀態設定為SHUTDOWN,那麼工作狀態的執行緒會將任務佇列中的任務都執行完畢之後自動銷燬。

本文主要講解了ThreadPoolExecutor的主要方法,執行緒池的排程方式,以及其核心功能的實現原理。