執行緒池原始碼解析
<span style="font-family: "microsoft yahei"; font-size: 12px; background-color: rgb(255, 255, 255);">轉載:</span>
http://blog.csdn.net/pangjiuzala/article/details/49556081<span style="font-family: "microsoft yahei"; font-size: 12px; background-color: rgb(255, 255, 255);">轉載:</span>
http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-ThreadPoolExecutor.html
引言
本文為Java高階程式設計中的一些知識總結,其中第一章對Jdk 1.7.0_25中的多執行緒
Java執行緒池架構原理及原始碼解析
ThreadPoolExecutor是一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。
構建引數原始碼
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
引數解釋
-
corePoolSize:核心執行緒數,會一直存活,即使沒有任務,執行緒池也會維護執行緒的最少數量。
-
maximumPoolSize: 執行緒池維護執行緒的最大數量。
-
keepAliveTime: 執行緒池維護執行緒所允許的空閒時間,當執行緒空閒時間達到keepAliveTime,該執行緒會退出,直到執行緒數量等於corePoolSize。如果allowCoreThreadTimeout設定為
true,則所有執行緒均會退出直到執行緒數量為0。
unit: 執行緒池維護執行緒所允許的空閒時間的單位、可選引數值為:TimeUnit中的幾個靜態屬性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。 -
workQueue:執行緒池所使用的緩衝佇列,常用的是:java.util.concurrent.ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。
-
handler: 執行緒池中的數量大於maximumPoolSize,對拒絕任務的處理策略,預設值ThreadPoolExecutor.AbortPolicy()。
原始碼詳細解析
excute原始碼
execute
方法主要三個步驟:
- 活動執行緒小於corePoolSize的時候建立新的執行緒;
- 活動執行緒大於corePoolSize時都是先加入到任務隊列當中;
- 任務佇列滿了再去啟動新的執行緒,如果執行緒數達到最大值就拒絕任務。
public void execute(Runnable command)
{
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
{
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
一個任務通過 execute(Runnable)方法被新增到執行緒池,任務就是一個Runnable型別的物件,任務的執行方法就是run()方法,如果傳入的為null,側丟擲NullPointerException。
首先第一個判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時候會進入if的區域,當然它不成立也有可能會進入,他會判定addIfUnderCorePoolSize是否返回false,如果返回false就會進去。
如果當前執行緒數小於corePoolSize,呼叫addIfUnderCorePoolSize方法,addIfUnderCorePoolSize方法首先呼叫mainLock加鎖,再次判斷當前執行緒數小於corePoolSize並且執行緒池處於RUNNING狀態,則呼叫addThread增加執行緒。
圖一:ThreadPoolExecutor執行狀態圖
addIfUnderCorePoolSize原始碼
private boolean addIfUnderCorePoolSize(Runnable firstTask)
{
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
addThread方法首先建立Work物件,然後呼叫threadFactory建立新的執行緒,如果建立的執行緒不為null,將Work物件的 thread屬性設定為此創建出來的執行緒,並將此Work物件放入workers中,然後在增加當前執行緒池的中執行緒數,增加後回到 addIfUnderCorePoolSize方法 ,釋放mainLock,最後啟動這個新建立的執行緒來執行新傳入的任務。
可以發現,這段原始碼是如果發現小於corePoolSize就會建立一個新的執行緒,並且呼叫執行緒的start()方法將執行緒執行起來:這個addThread()方法,我們先不考慮細節,因為我們還要先看到前面是怎麼進去的,這裡可以發信啊,只有沒有建立成功Thread才會返回false,也就是噹噹前的poolSize > corePoolSize的時候,或執行緒池已經不是在running狀態的時候才會出現。
注意:這裡在外部判定一次poolSize和corePoolSize只是初步判定,內部是加鎖後判定的,以得到更為準確的結果,而外部初步判定如果是大於了,就沒有必要進入這段有鎖的程式碼了。
addThread原始碼
private Thread addThread(Runnable firstTask)
{
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
< span style = "color:#ff0000;" > < / span >
if (t != null)
{
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
ThreadFactory介面預設實現DefaultThreadFactory
public Thread newThread(Runnable r)
{
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
這裡建立了一個Work,其餘的操作,就是講poolSize疊加,然後將將其放入workers的執行佇列等操作;
我們主要關心Worker是幹什麼的,因為這個threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會發現它的定義也是一個Runnable,外部開始在程式碼段中發現了呼叫哪個這個Worker的start()方法,也就是執行緒的啟動方法,其實也就是呼叫了Worker的run()方法,那麼我們重點要關心run方法是如何處理的。
Worker的run方法
public void run()
{
try
{
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null)
{
runTask(task);
task = null;
}
}
finally
{
workerDone(this);
}
}
從以上方法可以看出,Worker所在的執行緒啟動後,首先執行建立其時傳入的Runnable任務,執行完成後,迴圈呼叫getTask來獲取新的任務,在沒有任務的情況下,退出此執行緒。FirstTask其實就是開始在建立work的時候,由外部傳入的Runnable物件,也就是你自己的Thread,你會發現它如果發現task為空,就會呼叫getTask()方法再判定,直到兩者為空,並且是一個while迴圈體。
getTask原始碼
Runnable getTask()
{
for (;;)
{
try
{
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit())
{
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
}
catch (InterruptedException ie)
{
// On interruption, re-check runState
}
}
}
你會發現它是從workQueue佇列中,也就是等待佇列中獲取一個元素出來並返回!當前執行緒執行完後,在到workQueue中去獲取一個task出來,繼續執行,這樣就保證了執行緒池中有一定的執行緒一直在執行;此時若跳出了while循 環,只有workQueue佇列為空才會出現或出現了類似於shutdown的操作,自然執行佇列會減少1,當再有新的執行緒進來的時候,就又開始向 worker裡面放資料了,這樣以此類推,實現了執行緒池的功能。
execute方法部分實現
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如果當前執行緒池數量大於corePoolSize或addIfUnderCorePoolSize方法執行失敗,則執行後續操作;如果執行緒池處於執行狀態 並且workQueue中成功加入任務,再次判斷如果執行緒池的狀態不為執行狀態或當前執行緒池數為0,則呼叫 ensureQueuedTaskHandled方法。
ensureQueuedTaskHandled原始碼
private void ensureQueuedTaskHandled(Runnable command)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try
{
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
}
finally
{
mainLock.unlock();
}
if (reject)
reject(command);
else if (t != null)
t.start();
}
第一個if,也就是噹噹前狀態為running的時候,就會去執行workQueue.offer(command),這個workQueue其實就是一 個BlockingQueue,offer()操作就是在佇列的尾部寫入一個物件,此時寫入的物件為執行緒的物件而已;所以你可以認為只有執行緒池在 RUNNING狀態,才會在佇列尾部插入資料,否則就執行else if,其實else if可以看出是要做一個是否大於MaximumPoolSize的判定,如果大於這個值,就會做reject的操作。ensureQueuedTaskHandled方法判斷執行緒池執行,如果狀態不為執行狀態,從workQueue中刪除,並呼叫reject做拒絕處理。
reject原始碼
void reject(Runnable command)
{
handler.rejectedExecution(command, this);
}
再次回到execute方法
if (runState == RUNNING && workQueue.offer(command))
{
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如執行緒池workQueue offer失敗或不處於執行狀態,呼叫addIfUnderMaximumPoolSize, addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize實現類似,不同點在於根據最大執行緒數(maximumPoolSize)進行比較,如果超過最大執行緒數,返回false,呼叫reject方法。
addIfUnderMaximumPoolSize原始碼
private boolean addIfUnderMaximumPoolSize(Runnable firstTask)
{
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
}
finally
{
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
也就是如果執行緒池滿了,而且執行緒池呼叫了shutdown後,還在呼叫execute方法時,就會丟擲上面說明的異常:RejectedExecutionException。
workerDone原始碼
void workerDone(Worker w)
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
{
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
}
finally
{
mainLock.unlock();
}
}
注意這裡將workers.rem