Android執行緒池(四)ThreadPoolExecutor類原始碼解析
使用ThreadPoolExecutor
private final int CORE_POOL_SIZE = 4;//核心執行緒數
private final int MAX_POOL_SIZE = 5;//最大執行緒數
private final long KEEP_ALIVE_TIME = 10;//空閒執行緒超時時間
private final int BLOCK_SIZE = 2;//阻塞佇列大小
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE, KEEP_ALIVE_TIME,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(BLOCK_SIZE),Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
executorPool.allowCoreThreadTimeOut(true);
executorPool.execute(new WorkerThread(""));
官方文件說明
一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。
執行緒池可以解決兩個不同問題:
由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能。
提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。
強烈建議程式設計師使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)(固定大小執行緒池)和 Executors.newSingleThreadExecutor()(單個後臺執行緒),它們均為大多數使用場景預定義了設定
也可以可自定義:
核心和最大池大小
ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見 getMaximumPoolSize())設定的邊界自動調整池大小。
當新任務在方法 execute(java.lang.Runnable) 中提交時,如果執行的執行緒少於 corePoolSize,則建立新執行緒來處理請求,即使其他輔助執行緒是空閒的。
如果執行的執行緒多於 corePoolSize 而少於 maximumPoolSize,則僅當佇列滿時才建立新執行緒。
如果設定的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的執行緒池。
如果將 maximumPoolSize 設定為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的併發任務。
在大多數情況下,核心和最大池大小僅基於構造來設定,不過也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。
按需構造
預設情況下,即使核心執行緒最初只是在新任務到達時才建立和啟動的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進行動態重寫。如果構造帶有非空佇列的池,則可能希望預先啟動執行緒。
建立新執行緒
使用 ThreadFactory 建立新執行緒。如果沒有另外說明,則在同一個 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 建立執行緒,並且這些執行緒具有相同的 NORM_PRIORITY 優先順序和非守護程序狀態。
通過提供不同的 ThreadFactory,可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,等等。
如果從 newThread 返回 null 時 ThreadFactory 未能建立執行緒,則執行程式將繼續執行,但不能執行任何任務。
保持活動時間
如果池中當前有多於 corePoolSize 的執行緒,則這些多出的執行緒在空閒時間超過 keepAliveTime 時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit))。
這提供了當池處於非活動狀態時減少資源消耗的方法。如果池後來變得更為活動,則可以建立新的執行緒。
也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此引數。
使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在關閉前有效地從以前的終止狀態禁用空閒執行緒。
預設情況下,保持活動策略只在有多於 corePoolSizeThreads 的執行緒時應用。
但是隻要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可將此超時策略應用於核心執行緒。
一個小例子:
要做非同步任務執行佇列,具體需求如下:
有個執行緒池用於執行任務
有個有界佇列,用於快取未執行的任務
沒有任務執行時,我希望執行緒池中的執行緒停掉
這看似是個很正常的需求,但是用JDK1.5(我的工作地方JDK還是1.5的)實現,真得很困難的。
ThreadPoolExecutor中執行緒池有corePoolSize 和 maximumPoolSize兩個引數。JDK1.5中執行緒池至少保持corePoolSize的執行緒,所以為了滿足上面的需求,corePoolSize必須被設定為0。但是,JDK1.5中佇列不滿的話,是不會建立大於corePoolSize大小的執行緒數的。也就是,corePoolSize為0時,佇列滿了,才會建立新的執行緒,這顯然不滿足我的需求。
今天看JDK1.6的文件時,發現ThreadPoolExecutor多了一個allowCoreThreadTimeOut方法。這個方法是允許執行緒數低於corePoolSize時,執行緒也因為空閒而終止。有了這個方法,實現上面的需求就非常簡單了。將corePoolSize 和 maximumPoolSize設定為相同的大小,allowCoreThreadTimeOut設定為true,加上一個有界佇列,OK了。
排隊
所有 BlockingQueue 都可用於傳輸和保持提交的任務。可以使用此佇列與池大小進行互動:
如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。
如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。
比如說,corePoolSize為4,maximumPoolSize為5,使用了new LinkedBlockingDeque(),那麼就一直不會建立第5個執行緒。但是將new LinkedBlockingDeque()換成new LinkedBlockingDeque(1),當任務小於等於5時,不會建立第5個執行緒;當任務為大於或等於6個時,會建立第5個執行緒。
如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。
排隊有三種通用策略:
1、直接提交。工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
2、無界佇列。使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。
3、有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。
被拒絕的任務
當 Executor 已經關閉,並且 Executor 將有限邊界用於最大執行緒和工作佇列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。
在以上兩種情況下,execute 方法都將呼叫其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。
下面提供了四種預定義的處理程式策略:
1、在預設的 ThreadPoolExecutor.AbortPolicy 中,處理程式遭到拒絕將丟擲執行時 RejectedExecutionException。
在 ThreadPoolExecutor.CallerRunsPolicy 中,執行緒呼叫執行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
2、在 ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。
3、在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。
4、定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。
鉤子 (hook) 方法
此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執行每個任務之前和之後呼叫。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、蒐集統計資訊或新增日誌條目。此外,還可以重寫方法 terminated() 來執行 Executor 完全終止後需要完成的所有特殊處理。
如果鉤子 (hook) 或回撥方法丟擲異常,則內部輔助執行緒將依次失敗並突然終止。
佇列維護
方法 getQueue() 允許出於監控和除錯目的而訪問工作佇列。強烈反對出於其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用於在取消大量已排隊任務時幫助進行儲存回收。
終止
程式 AND 不再引用的池沒有剩餘執行緒會自動 shutdown。如果希望確保回收取消引用的池(即使使用者忘記呼叫 shutdown()),則必須安排未使用的執行緒最終終止:設定適當保持活動時間,使用 0 核心執行緒的下邊界和/或設定 allowCoreThreadTimeOut(boolean)。
擴充套件示例。此類的大多數擴充套件可以重寫一個或多個受保護的鉤子 (hook) 方法。例如,下面是一個添加了簡單的暫停/恢復功能的子類:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch(InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
原始碼
package java.util.concurrent;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
主池控制狀態ctl是一個原子整數包裝兩個概念領域
workerCount,指示執行緒的有效數量
runState, 指示是否執行,關閉等
為了將它們打包成一個int,我們將workerCount限制為(2 ^ 29)-1(約5億)個執行緒,而不是(2 ^ 31)-1(20億),否則可以表示。
如果將來這是一個問題,變數可以改為AtomicLong,並且shift / mask常數下面被調整。
但直到需要出現,這個程式碼使用一個int來更快更簡單。
workerCount是允許啟動並且不允許停止的工作人員的數量。
該值可能與實際執行緒的實際數量暫時不同,例如當ThreadFactory在被詢問時無法建立執行緒,並且退出執行緒在終止之前仍然執行記錄時。
使用者可見的池大小被報告為工作集的當前大小。
runState提供主要的生命週期控制,取值:
RUNNING: 接受新任務並處理排隊的任務
SHUTDOWN: 不接受新任務,而是處理排隊的任務
STOP: 不接受新任務,不處理排隊任務,
並中斷進行中的任務
TIDYING: 所有任務已經終止,workerCount為零,執行緒轉換到狀態TIDYING將執行terminate()hook方法
TERMINATED: terminated()已完成
這些值之間的數字順序很重要,以便進行有序的比較。 runState隨著時間的推移單調增加,但不需要擊中每個狀態。 過渡是:
*
* RUNNING -> SHUTDOWN
* 關於shutdown()的呼叫,可能隱含在finalize()
* (RUNNING or SHUTDOWN) -> STOP
* 呼叫shutdownNow()
* SHUTDOWN -> TIDYING
* 佇列和池都為空時
* STOP -> TIDYING
* 當池是空的
* TIDYING -> TERMINATED
* 當terminate()hook 方法完成時
*
在等待Termination()等待的執行緒將在狀態達到TERMINATED時返回。
*
檢測從SHUTDOWN到TIDYING的轉換比您想要的不那麼簡單,因為在SHUTDOWN狀態下,佇列可能在非空之後變為空,反之亦然,
但是我們只能在看到它為空之後終止,我們看到workerCount 是0(有時需要重新檢查 - 見下文)。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState儲存在高位中
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 包裝和包裝ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
不需要打包ctl的位欄位訪問器。 這些取決於位佈局,而workerCount從不負面
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 嘗試CAS增加ctl的workerCount欄位。
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 嘗試CAS減去ctl的workerCount欄位。
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
減去ctl的workerCount欄位。 這僅線上程突然終止時才呼叫(請參閱processWorkerExit)。 在getTask中執行其他遞減。
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
用於儲存任務並切換到工作執行緒的佇列。
我們不要求這個工作Queue.poll()返回null必然意味著workQueue.isEmpty(),
所以僅僅依靠isEmpty來檢視佇列是否為空(例如,當決定是否從SHUTDOWN轉換為TIDYING時,我們必須做) 。
這適用於特殊用途的佇列,例如DelayQueues,即使在延遲到期時可以稍後返回非空值,poll()被允許返回null。
*/
private final BlockingQueue<Runnable> workQueue;
/**
鎖定在訪問工人集合和相關的簿記時。
儘管我們可以使用某種併發的集合,但是通常最好使用鎖。
原因在於這個序列化了interruptIdleWorkers,這樣可以避免不必要的中斷風暴,尤其是在關機期間。
否則退出執行緒將同時中斷尚未中斷的執行緒。
它還簡化了一些關於maximumPoolSize的相關統計資訊的記錄。
我們還在shutdown和shutdownNow上儲存了mainLock,以確保工作者設定穩定,同時單獨檢查中斷和實際中斷的許可權。
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
集合包含池中的所有工作執行緒。 只能在拿著mainLock時才能訪問。
*/
private final HashSet<Worker> workers = new HashSet<>();
/**
* 等待條件支援等待終止。
*/
private final Condition termination = mainLock.newCondition();
/**
* 達到最大池大小。 只能在mainLock下使用。
*/
private int largestPoolSize;
/**
計數器完成任務。 僅在終止工作執行緒時更新。 只能在mainLock下使用。
*/
private long completedTaskCount;
/*
所有使用者控制引數被宣告為揮發性,以便持續的操作基於最新值,
但不需要鎖定,因為沒有內部不變數取決於它們與其他操作同步地改變。
*/
/**
工廠新執行緒。 所有執行緒都使用此工廠(通過方法addWorker)建立。
所有呼叫者必須準備好使addWorker失敗,這可能反映了限制執行緒數量的系統或使用者的策略。
即使不將其視為錯誤,建立執行緒的失敗可能會導致新任務被拒絕或現有的任務仍然停留在佇列中。
*
我們進一步保留池不變數,即使面臨諸如OutOfMemoryError這樣的錯誤,可能在嘗試建立執行緒時丟擲。
由於需要在Thread.start中分配本機堆疊,所以這些錯誤是相當普遍的,使用者需要執行清理池關閉來清理。
可能有足夠的記憶體可用於清理程式碼完成,而不會遇到另一個OutOfMemoryError。
*/
private volatile ThreadFactory threadFactory;
/**
* 處理程式在執行飽和或關閉時呼叫。
*/
private volatile RejectedExecutionHandler handler;
/**
等待工作的空閒執行緒以超時為單位。
當存在超過corePoolSize或allowCoreThreadTimeOut時,執行緒使用此超時。
否則,他們會永遠等待新的工作。
*/
private volatile long keepAliveTime;
/**
如果為false(預設),即使空閒時,核心執行緒仍然保持活動狀態。 如果是真的,核心執行緒使用keepAliveTime超時等待工作。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
核心池大小是保留活動的最小工作數(並且不允許超時等),除非設定allowCoreThreadTimeOut,在這種情況下,最小值為零。
*/
private volatile int corePoolSize;
/**
最大池大小。
請注意,實際最大值在內部由CAPACITY限制。
*/
private volatile int maximumPoolSize;
/**
* 預設拒絕的執行處理程式。
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
/**
呼叫者關閉和關閉所需的許可權。
我們另外要求(參見checkShutdownAccess),呼叫者有權實際中斷工作集中的執行緒(由Thread.interrupt管理,Thread依賴於ThreadGroup.checkAccess,後者依賴於SecurityManager.checkAccess)。
只有通過這些檢查,才會嘗試關閉。
*
Thread.interrupt的所有實際呼叫(請參閱interruptIdleWorkers和interruptWorkers)忽略SecurityExceptions,
這意味著嘗試的中斷默默地失敗 在關閉的情況下,除非SecurityManager具有不一致的策略,
否則它們不應該失敗,有時允許訪問執行緒,有時不允許訪問執行緒。
在這種情況下,無法實際中斷執行緒可能會禁用或延遲完全終止。
interruptIdleWorkers的其他用途是建議,實際中斷的失敗只會延遲對配置更改的響應,因此不會被特別處理。
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/**
類工作者主要維護執行緒執行任務的中斷控制狀態,以及其他較小的簿記。
該類機會地擴充套件了AbstractQueuedSynchronizer,以簡化獲取和釋放圍繞每個任務執行的鎖。
這樣可以防止意圖喚醒等待任務的工作執行緒的中斷,從而中斷正在執行的任務。
我們實現一個簡單的非可重入互斥鎖而不是使用ReentrantLock,因為我們不希望工作任務在呼叫像setCorePoolSize這樣的池控制方法時重新獲取鎖。
另外,為了線上程實際開始執行任務之前抑制中斷,我們將鎖定狀態初始化為負值,並在啟動時(在runWorker中)清除它。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
這個類永遠不會被序列化,但是我們提供了一個serialVersionUID來抑制javac警告。
*/
private static final long serialVersionUID = 6138294804551838833L;
/** 執行緒這個工作正在執行。如果工廠出現故障,則為空 */
final Thread thread;
/** 執行的初始任務。 可能為空 */
Runnable firstTask;
/** 每執行緒任務計數器 */
volatile long completedTasks;
/**
從ThreadFactory建立給定的第一個任務和執行緒。
引數:firstTask 第一個任務(如果沒有,則為null)
*/
Worker(Runnable firstTask) {
setState(-1); // 禁止中斷直到runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 將主執行迴圈委託給外部的runWorker。 */
public void run() {
runWorker(this);
}
//鎖定方法
//
//值0表示解鎖狀態。
//值為1表示鎖定狀態。
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
/*
* 設定控制狀態的方法
*/
/**
將runState轉換為給定目標,或者如果已經至少給定目標,則將其保留。
*
* 引數: targetState
所需的狀態,SHUTDOWN或STOP(但不是TIDYING或TERMINATED) - 使用tryTerminate)
*/
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
*如果(SHUTDOWN和池和佇列為空)或(STOP和池為空),則轉換到TERMINATED狀態。
* 如果否則可以終止,但是workerCount不為零,則會中斷一個空閒的工作人員以確保關閉訊號傳播。
* 必須在可能終止可能的任何操作之後呼叫此方法 - 在關閉期間減少工作人員計數或從佇列中刪除任務。
* 該方法是非私有的,允許從ScheduledThreadPoolExecutor訪問。
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else 重試失敗的CAS
}
}
/*
* 控制工作執行緒中斷的方法
*/
/**
如果有安全管理器,請確保呼叫者有權關閉執行緒(請參閱shutdownPerm)。
如果這樣通過,另外確保呼叫者被允許中斷每個工作執行緒。
即使第一次檢查通過,如果SecurityManager特別處理某些執行緒,這可能不是真的。
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
/**
中斷所有執行緒,即使活動。 忽略SecurityExceptions(在這種情況下,一些執行緒可能保持不間斷)。
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
/**
中斷可能正在等待任務的執行緒(由未被鎖定指示),以便它們可以檢查終止或配置更改。
忽略SecurityExceptions(在這種情況下,一些執行緒可能保持不間斷)。
*
* 引數 onlyOne
如果是ture,最多中斷一個工人。 這僅在嘗試終止時才從tryTerminate呼叫,但是還有其他工作人員。
在這種情況下,大多數等待工作人員被中斷以傳播關閉訊號,以防所有執行緒正在等待。
中斷任意任意執行緒可確保從關機開始以來新到達的工作人員也將最終退出。
為了確保最終終止,只需要中斷只有一個空閒的工作人員就足夠了,但是shutdown()會中斷所有閒置的工作人員,以便冗餘的工作人員立即退出,而不是
等待一個分散的任務完成。
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
通常形式的interruptIdleWorkers,以避免不必要記住布林引數的含義。
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
/*
其他實用程式,其中大部分也匯出到ScheduledThreadPoolExecutor
*/
/**
呼叫給定命令的被拒絕的執行處理程式。
包保護供ScheduledThreadPoolExecutor使用。
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
執行關閉呼叫後的執行狀態轉換後進一步清理。
這裡沒有操作,但由ScheduledThreadPoolExecutor用於取消延遲的任務。
*/
void onShutdown() {
}
/**
*由ScheduledThreadPoolExecutor需要進行狀態檢查,以便在關機期間啟用執行任務。
*
* 引數shutdownOK true
如果SHUTDOWN應該返回true
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
/**
將任務佇列排入新列表,通常使用drainTo。
但是,如果佇列是DelayQueue或任何其他型別的佇列,poll或drainTo可能無法刪除某些元素,則會逐個刪除它們。
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
/*
*工作人員建立,執行和清理的方法
*/
/**
檢查是否可以針對當前池狀態和給定繫結(核心或最大值)新增新的工作者。
如果是這樣,則相應地調整工作人員數量,並且如果可能,則建立並啟動新的工作人員,執行firstTask作為其第一個任務。
如果池停止或有資格關閉,此方法返回false。
如果執行緒工廠在詢問時無法建立執行緒,那麼它也返回false。 如果執行緒建立失敗,
由於執行緒工廠返回null或由於異常(通常在Thread.start()中通常為OutOfMemoryError),因此我們回滾乾淨。
*
* 引數 firstTask
新執行緒應該首先執行的任務(如果沒有,則為null)。
當有少於corePoolSize執行緒(在這種情況下我們總是啟動一個),或當佇列已滿(在這種情況下,我們必須繞過佇列)時,工作者將建立一個初始的第一個任務(方法execute())來繞過排隊, 。
最初空閒執行緒通常通過prestartCoreThread建立或替換其他垂死的工作人員。
*
* 引數 core
如果真的使用corePoolSize作為繫結,否則maximumPoolSize。
(這裡使用一個布林指示器,而不是一個值,以確保在檢查其他池狀態後讀取新值)。
* 返回 true 成功的時候
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查佇列是否只在必要時為空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重讀 ctl
if (runStateOf(c) != rs)
continue retry;
// 其他CAS由於workerCount更改而失敗; 重試內迴圈
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新檢查,同時保持鎖定。
// ThreadFactory故障或如果在鎖獲取之前關閉。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 預先檢查t是否可以啟動
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
回滾工作執行緒建立。 如果工人退出工作人員,如果工作人員退出工作人員,重新檢查終止,以防該工人的工作終止
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
為垂死的工人執行清理和簿記。
僅從工作執行緒呼叫。
除非已完成設定,否則假設workerCount已經被調整以解除退出。
這個方法從工作集中刪除執行緒如果由於使用者任務異常退出,或者如果少於corePoolSize工作正在執行或佇列不為空,
但沒有工作人員,則可能會終止池或替換該工作程式。
*
* 引數 w
* the worker
* 引數completedAbruptly
* 如果工作人員因使用者異常而死亡
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果突然,那麼workerCount沒有被調整
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
/**
根據當前配置設定執行阻止或定時等待任務,或者如果此工作方必須退出,則返回null:
* 1. 有超過maximumPoolSize的工作人員(由於
呼叫setMaximumPoolSize)。
* 2. 執行緒池停止.
* 3. 池關閉,佇列為空
* 4. 該工作人員超時等待任務,
* 超時工作人員在定時等待之前和之後都會終止(即{@code allowCoreThreadTimeOut || workerCount> corePoolSize}),
* 如果佇列不為空, 這個工作人員不是池中的最後一個執行緒。
*
* 返回 任務,如果工作人員必須退出,則為null,在這種情況下,workerCount將被遞減
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查佇列是否只在必要時為空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//工人是否被淘汰?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
/**
主要工人執行迴圈。 重複從佇列中獲取任務並執行它們,同時應對若干問題:
1. 我們可以從初始任務開始,在這種情況下,我們不需要得到第一個任務。
否則,只要池正在執行,我們從getTask獲取任務。
如果它返回null,那麼由於更改池狀態或配置引數,該工作器將退出。
其他退出會導致外部程式碼中的異常丟擲,在這種情況下,已完成持有,通常會導致processWorkerExit替換此執行緒。
2.在執行任何任務之前,在執行任務時獲取鎖以防止其他池中斷,然後我們確保除非池停止,否則該執行緒不會設定其中斷。
3.每個任務執行之前都是呼叫beforeExecute,這可能會引發一個異常,在這種情況下,
我們會導致執行緒宕機(斷開迴圈,並且已完成),而不處理任務。
4. 假設beforeExecute正常完成,我們執行該任務,收集其丟擲的任何異常以傳送到afterExecute。
我們分別處理RuntimeException,Error(這兩個規範保證我們陷阱)和任意Throwables。
因為我們不能在Runnable.run內重新丟擲Throwables,
所以我們把它們包裹在錯誤的路上(到執行緒的UncaughtExceptionHandler)。
任何丟擲的異常也保守地導致執行緒死亡。
5. 在task.run完成後,我們呼叫afterExecute,這也可能會引發異常,這也會導致執行緒宕機。 根據JLS Sec 14.20,這個異常是即使task.run丟擲也將生效的例外。
異常機制的淨效果是afterExecute和執行緒的UncaughtExceptionHandler具有我們可以提供使用者程式碼遇到的任何問題的準確資訊。
*
* 引數 w
* the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允許中斷
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果池停止,請確保執行緒中斷; 如果沒有,確保執行緒不中斷。
// 這需要在第二種情況下重新檢查以在清除中斷時處理shutdownNow比賽
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 公共建構函式和方法
/**
用給定的初始引數和預設的執行緒工廠及被拒絕的執行處理程式建立新的 ThreadPoolExecutor。使用 Executors 工廠方法之一比使用此通用構造方法方便得多。
引數:
corePoolSize - 池中所儲存的執行緒數,包括空閒執行緒。
maximumPoolSize - 池中允許的最大執行緒數。
keepAliveTime - 當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。
unit - keepAliveTime 引數的時間單位。
workQueue - 執行前用於保持任務的佇列。此佇列僅保持由 execute 方法提交的 Runnable 任務。
丟擲:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小於 0,或者 maximumPoolSize 小於等於 0,或者 corePoolSize 大於 maximumPoolSize。
NullPointerException - 如果 workQueue 為 null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
用給定的初始引數和預設被拒絕的執行處理程式建立新的 ThreadPoolExecutor。
引數:
corePoolSize - 池中所儲存的執行緒數,包括空閒執行緒。
maximumPoolSize - 池中允許的最大執行緒數。
keepAliveTime - 當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。
unit - keepAliveTime 引數的時間單位。
workQueue - 執行前用於保持任務的佇列。此佇列僅保持由 execute 方法提交的 Runnable 任務。
threadFactory - 執行程式建立新執行緒時使用的工廠。
丟擲:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小於