java.util.concurrent.AbstractExecutorService 原始碼
執行緒池相關
原始碼:
package java.util.concurrent; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } private void decrementWorkerCount() { do { } while (!compareAndDecrementWorkerCount(ctl.get())); } private final BlockingQueue<Runnable> workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet<Worker> workers = new HashSet<Worker>(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); private final AccessControlContext acc; private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } private void advanceRunState(int targetState) { for (; ; ) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } final void tryTerminate() { for (; ; ) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } } private void checkShutdownAccess() { SecurityManager security = System.getSecurityManager(); if (security != null) { security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) security.checkAccess(w.thread); } finally { mainLock.unlock(); } } } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private static final boolean ONLY_ONE = true; final void reject(Runnable command) { handler.rejectedExecution(command, this); } void onShutdown() { } final boolean isRunningOrShutdown(boolean shutdownOK) { int rs = runStateOf(ctl.get()); return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); } private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } } private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } } private Runnable getTask() { boolean timedOut = false; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean isShutdown() { return !isRunning(ctl.get()); } public boolean isTerminating() { int c = ctl.get(); return !isRunning(c) && runStateLessThan(c, TERMINATED); } public boolean isTerminated() { return runStateAtLeast(ctl.get(), TERMINATED); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (; ; ) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } protected void finalize() { SecurityManager sm = System.getSecurityManager(); if (sm == null || acc == null) { shutdown(); } else { PrivilegedAction<Void> pa = () -> { shutdown(); return null; }; AccessController.doPrivileged(pa, acc); } } public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } public ThreadFactory getThreadFactory() { return threadFactory; } public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null) throw new NullPointerException(); this.handler = handler; } public RejectedExecutionHandler getRejectedExecutionHandler() { return handler; } public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this.corePoolSize; this.corePoolSize = corePoolSize; if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } } public int getCorePoolSize() { return corePoolSize; } public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; } public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } } public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this.maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers(); } public int getMaximumPoolSize() { return maximumPoolSize; } public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); long keepAliveTime = unit.toNanos(time); long delta = keepAliveTime - this.keepAliveTime; this.keepAliveTime = keepAliveTime; if (delta < 0) interruptIdleWorkers(); } public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } public BlockingQueue<Runnable> getQueue() { return workQueue; } public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); return removed; } public void purge() { final BlockingQueue<Runnable> q = workQueue; try { Iterator<Runnable> it = q.iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future<?> && ((Future<?>) r).isCancelled()) it.remove(); } } catch (ConcurrentModificationException fallThrough) { for (Object r : q.toArray()) if (r instanceof Future<?> && ((Future<?>) r).isCancelled()) q.remove(r); } tryTerminate(); } public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } } public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } } public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } public String toString() { long ncompleted; int nworkers, nactive; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { ncompleted = completedTaskCount; nactive = 0; nworkers = workers.size(); for (Worker w : workers) { ncompleted += w.completedTasks; if (w.isLocked()) ++nactive; } } finally { mainLock.unlock(); } int c = ctl.get(); String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : (runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down")); return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]"; } protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { } public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } }
類 ThreadPoolExecutor
所有已實現的介面:
直接已知子類:
一個 ExecutorService
Executors
工廠方法配置。
執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個 ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。
為了便於跨大量上下文使用,此類提供了很多可調整的引數和擴充套件鉤子 (hook)。但是,強烈建議程式設計師使用較為方便的
Executors
工廠方法 Executors.newCachedThreadPool()
(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)
(固定大小執行緒池)和 Executors.newSingleThreadExecutor()
(單個後臺執行緒),它們均為大多數使用場景預定義了設定。否則,在手動配置和調整此類時,使用以下指導:
核心和最大池大小
ThreadPoolExecutor 將根據 corePoolSize(參見 getCorePoolSize()
)和 maximumPoolSize(參見 getMaximumPoolSize()
)設定的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable)
中提交時,如果執行的執行緒少於 corePoolSize,則建立新執行緒來處理請求,即使其他輔助執行緒是空閒的。如果執行的執行緒多於 corePoolSize 而少於 maximumPoolSize,則僅當佇列滿時才建立新執行緒。如果設定的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的執行緒池。如果將 maximumPoolSize 設定為基本的無界值(如 Integer.MAX_VALUE),則允許池適應任意數量的併發任務。在大多數情況下,核心和最大池大小僅基於構造來設定,不過也可以使用 setCorePoolSize(int)
和 setMaximumPoolSize(int)
進行動態更改。
按需構造
預設情況下,即使核心執行緒最初只是在新任務到達時才建立和啟動的,也可以使用方法 prestartCoreThread()
或 prestartAllCoreThreads()
對其進行動態重寫。如果構造帶有非空佇列的池,則可能希望預先啟動執行緒。
建立新執行緒
使用 ThreadFactory
建立新執行緒。如果沒有另外說明,則在同一個 ThreadGroup
中一律使用 Executors.defaultThreadFactory()
建立執行緒,並且這些執行緒具有相同的 NORM_PRIORITY 優先順序和非守護程序狀態。通過提供不同的 ThreadFactory,可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,等等。如果從 newThread 返回 null 時 ThreadFactory 未能建立執行緒,則執行程式將繼續執行,但不能執行任何任務。
保持活動時間
如果池中當前有多於 corePoolSize 的執行緒,則這些多出的執行緒在空閒時間超過 keepAliveTime 時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit)
)。這提供了當池處於非活動狀態時減少資源消耗的方法。如果池後來變得更為活動,則可以建立新的執行緒。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit)
動態地更改此引數。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS
的值在關閉前有效地從以前的終止狀態禁用空閒執行緒。預設情況下,保持活動策略只在有多於 corePoolSizeThreads 的執行緒時應用。但是隻要 keepAliveTime 值非 0, allowCoreThreadTimeOut(boolean)
方法也可將此超時策略應用於核心執行緒。
排隊
所有 BlockingQueue
都可用於傳輸和保持提交的任務。可以使用此佇列與池大小進行互動:
- 如果執行的執行緒少於 corePoolSize,則 Executor 始終首選新增新的執行緒,而不進行排隊。
- 如果執行的執行緒等於或多於 corePoolSize,則 Executor 始終首選將請求加入佇列,而不新增新的執行緒。
- 如果無法將請求加入佇列,則建立新的執行緒,除非建立此執行緒超出 maximumPoolSize,在這種情況下,任務將被拒絕。
排隊有三種通用策略:
- 直接提交。工作佇列的預設選項是
SynchronousQueue
,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。 - 無界佇列。使用無界佇列(例如,不具有預定義容量的
LinkedBlockingQueue
)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web 頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。 - 有界佇列。當使用有限的 maximumPoolSizes 時,有界佇列(如
ArrayBlockingQueue
)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。
被拒絕的任務
當 Executor 已經關閉,並且 Executor 將有限邊界用於最大執行緒和工作佇列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。
在以上兩種情況下, execute 方法都將呼叫其RejectedExecutionHandler 的 rejectedExecution(Runnable,ThreadPoolExecutor) 方法。
下面提供了四種預定義的處理程式策略:
- 在預設的
ThreadPoolExecutor.AbortPolicy
中,處理程式遭到拒絕將丟擲執行時RejectedExecutionException
。 - 在
ThreadPoolExecutor.CallerRunsPolicy
中,執行緒呼叫執行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。 - 在
ThreadPoolExecutor.DiscardPolicy
中,不能執行的任務將被刪除。 - 在
ThreadPoolExecutor.DiscardOldestPolicy
中,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。
定義和使用其他種類的 RejectedExecutionHandler
類也是可能的,但這樣做需要非常小心,尤其是當策略僅用於特定容量或排隊策略時。
鉤子 (hook) 方法
此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable)
和 afterExecute(java.lang.Runnable, java.lang.Throwable)
方法,這兩種方法分別在執行每個任務之前和之後呼叫。它們可用於操縱執行環境;例如,重新初始化 ThreadLocal、蒐集統計資訊或新增日誌條目。此外,還可以重寫方法 terminated()
來執行 Executor 完全終止後需要完成的所有特殊處理。
如果鉤子 (hook) 或回撥方法丟擲異常,則內部輔助執行緒將依次失敗並突然終止。
佇列維護
方法 getQueue()
允許出於監控和除錯目的而訪問工作佇列。強烈反對出於其他任何目的而使用此方法。 remove(java.lang.Runnable)
和 purge()
這兩種方法可用於在取消大量已排隊任務時幫助進行儲存回收。
終止
程式中不再引用的池沒有剩餘執行緒會自動 shutdown。如果希望確保回收取消引用的池(即使使用者忘記呼叫 shutdown()
),則必須安排未使用的執行緒最終終止:設定適當保持活動時間,使用 0 核心執行緒的下邊界和/或設定 allowCoreThreadTimeOut(boolean)
。
擴充套件示例。此類的大多數擴充套件可以重寫一個或多個受保護的鉤子 (hook) 方法。例如,下面是一個添加了簡單的暫停/恢復功能的子類:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch(InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
巢狀類摘要
static class |
ThreadPoolExecutor.AbortPolicy 用於被拒絕任務的處理程式,它將丟擲 RejectedExecutionException. |
static class |
ThreadPoolExecutor.CallerRunsPolicy 用於被拒絕任務的處理程式,它直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務。 |
static class |
ThreadPoolExecutor.DiscardOldestPolicy 用於被拒絕任務的處理程式,它放棄最舊的未處理請求,然後重試 execute;如果執行程式已關閉,則會丟棄該任務。 |
static class |
ThreadPoolExecutor.DiscardPolicy 用於被拒絕任務的處理程式,預設情況下它將丟棄被拒絕的任務。 |
構造方法摘要
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 用給定的初始引數和預設的執行緒工廠及被拒絕的執行處理程式建立新的 ThreadPoolExecutor。 |
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 用給定的初始引數和預設的執行緒工廠建立新的 ThreadPoolExecutor。 |
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 用給定的初始引數和預設被拒絕的執行處理程式建立新的 ThreadPoolExecutor。 |
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 用給定的初始引數建立新的 ThreadPoolExecutor。 |
方法摘要
protected void |
afterExecute(Runnable r, Throwable t) 基於完成執行給定 Runnable 所呼叫的方法。 |
void |
allowCoreThreadTimeOut(boolean value) 如果在保持活動時間內沒有任務到達,新任務到達時正在替換(如果需要),則設定控制核心執行緒是超時還是終止的策略。 |
boolean |
allowsCoreThreadTimeOut() 如果此池允許核心執行緒超時和終止,如果在 keepAlive 時間內沒有任務到達,新任務到達時正在替換(如果需要),則返回 true。 |
boolean |
awaitTermination(long timeout, TimeUnit unit) 請求關閉、發生超時或者當前執行緒中斷,無論哪一個首先發生之後,都將導致阻塞,直到所有任務完成執行。 |
protected void |
beforeExecute(Thread t, Runnable r) 在執行給定執行緒中的給定 Runnable 之前呼叫的方法。 |
void |
execute(Runnable command) 在將來某個時間執行給定任務。 |
protected void |
finalize() 當不再引用此執行程式時,呼叫 shutdown。 |
int |
getActiveCount() 返回主動執行任務的近似執行緒數。 |
long |
getCompletedTaskCount() 返回已完成執行的近似任務總數。 |
int |
getCorePoolSize() 返回核心執行緒數。 |
long |
getKeepAliveTime(TimeUnit unit) 返回執行緒保持活動的時間,該時間就是超過核心池大小的執行緒可以在終止前保持空閒的時間值。 |
int |
getLargestPoolSize() 返回曾經同時位於池中的最大執行緒數。 |
int |
getMaximumPoolSize() 返回允許的最大執行緒數。 |
int |
getPoolSize() 返回池中的當前執行緒數。 |
BlockingQueue<Runnable> |
getQueue() 返回此執行程式使用的任務佇列。 |
RejectedExecutionHandler |
getRejectedExecutionHandler() 返回用於未執行任務的當前處理程式。 |
long |
getTaskCount() 返回曾計劃執行的近似任務總數。 |
ThreadFactory |
getThreadFactory() 返回用於建立新執行緒的執行緒工廠。 |
boolean |
isShutdown() 如果此執行程式已關閉,則返回 true。 |
boolean |
isTerminated() &n |