沉澱再出發:java中執行緒池解析
沉澱再出發:java中執行緒池解析
一、前言
在多執行緒執行的環境之中,如果執行緒執行的時間短但是啟動的執行緒又非常多,執行緒運轉的時間基本上浪費在了建立和銷燬上面,因此有沒有一種方式能夠讓一個執行緒執行完自己的任務之後又被重複使用呢?執行緒池的出現就是為了解決這個問題。到了現在,我們知道的池已經有很多了,比如IP池,在NAT協議中使用,比如快取機制,其實本質上就是重複利用已經產生的資源,從而減少對新資源的使用,以此來緩解對記憶體和CPU的壓力,或者加快執行的效率。
二、執行緒池的基本理解
2.1、執行緒池的概念
多執行緒的非同步執行方式,雖然能夠最大限度發揮多核計算機的計算能力,但是如果不加控制,反而會對系統造成負擔。執行緒本身也要佔用記憶體空間,大量的執行緒會佔用記憶體資源並且可能會導致Out of Memory。即便沒有這樣的情況,大量的執行緒回收也會給GC帶來很大的壓力。為了避免重複的建立執行緒,執行緒池的出現可以讓執行緒進行復用。通俗點講,當有工作來,就會向執行緒池拿一個執行緒,當工作完成後,並不是直接關閉執行緒,而是將這個執行緒歸還給執行緒池供其他任務使用。
Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;
然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;
然後ThreadPoolExecutor繼承了類AbstractExecutorService。
在ThreadPoolExecutor類中有幾個非常重要的方法:
1 execute() 2 submit() 3 shutdown() 4 shutdownNow()
execute()方法實際上是Executor中宣告的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行。
submit()方法是在ExecutorService中宣告的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果。
shutdown()和shutdownNow()是用來關閉執行緒池的。
還有很多其他的方法,比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與執行緒池相關屬性的方法。
2.2、執行緒池的原始碼分析
java.uitl.concurrent.ThreadPoolExecutor類是執行緒池中最核心的一個類,因此如果要透徹地瞭解Java中的執行緒池,必須先了解這個類。
讓我們看一個例子:
1 package com.threadpool.test; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.ThreadPoolExecutor; 5 import java.util.concurrent.TimeUnit; 6 7 public class ThreadPoolTest { 8 public static void main(String[] args) { 9 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, 10 new ArrayBlockingQueue<Runnable>(5)); 11 12 for(int i=0;i<15;i++){ 13 MyTask myTask = new MyTask(i); 14 executor.execute(myTask); 15 System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列中等待執行的任務數目:"+ 16 executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount()); 17 } 18 executor.shutdown(); 19 } 20 } 21 22 23 class MyTask implements Runnable { 24 private int taskNum; 25 26 public MyTask(int num) { 27 this.taskNum = num; 28 } 29 30 public void run() { 31 System.out.println("正在執行task "+taskNum); 32 try { 33 Thread.currentThread().sleep(4000); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 System.out.println("task "+taskNum+"執行完畢"); 38 } 39 }
執行結果:
1 執行緒池中執行緒數目:1,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0 2 執行緒池中執行緒數目:2,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0 3 執行緒池中執行緒數目:3,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0 4 執行緒池中執行緒數目:4,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0 5 執行緒池中執行緒數目:5,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0 6 正在執行task 4 7 正在執行task 3 8 正在執行task 2 9 正在執行task 1 10 執行緒池中執行緒數目:5,佇列中等待執行的任務數目:1,已執行玩別的任務數目:0 11 執行緒池中執行緒數目:5,佇列中等待執行的任務數目:2,已執行玩別的任務數目:0 12 執行緒池中執行緒數目:5,佇列中等待執行的任務數目:3,已執行玩別的任務數目:0 13 執行緒池中執行緒數目:5,佇列中等待執行的任務數目:4,已執行玩別的任務數目:0 14 執行緒池中執行緒數目:5,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0 15 執行緒池中執行緒數目:6,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0 16 執行緒池中執行緒數目:7,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0 17 執行緒池中執行緒數目:8,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0 18 執行緒池中執行緒數目:9,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0 19 執行緒池中執行緒數目:10,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0 20 正在執行task 0 21 正在執行task 10 22 正在執行task 11 23 正在執行task 12 24 正在執行task 13 25 正在執行task 14 26 task 2執行完畢 27 task 3執行完畢 28 正在執行task 5 29 正在執行task 6 30 task 4執行完畢 31 正在執行task 7 32 task 1執行完畢 33 正在執行task 8 34 task 0執行完畢 35 正在執行task 9 36 task 11執行完畢 37 task 10執行完畢 38 task 14執行完畢 39 task 13執行完畢 40 task 12執行完畢 41 task 6執行完畢 42 task 5執行完畢 43 task 8執行完畢 44 task 7執行完畢 45 task 9執行完畢View Code
來看一下ThreadPoolExecutor:
1 /* 2 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. 3 * 4 * 5 * 6 * 7 * 8 * 9 * 10 * 11 * 12 * 13 * 14 * 15 * 16 * 17 * 18 * 19 * 20 * 21 * 22 * 23 */ 24 25 /* 26 * 27 * 28 * 29 * 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 38 import java.util.concurrent.locks.Condition; 39 import java.util.concurrent.locks.ReentrantLock; 40 import java.util.concurrent.atomic.AtomicInteger; 41 import java.util.*; 42 43 /** 44 * An {@link ExecutorService} that executes each submitted task using 45 * one of possibly several pooled threads, normally configured 46 * using {@link Executors} factory methods. 47 * 48 * <p>Thread pools address two different problems: they usually 49 * provide improved performance when executing large numbers of 50 * asynchronous tasks, due to reduced per-task invocation overhead, 51 * and they provide a means of bounding and managing the resources, 52 * including threads, consumed when executing a collection of tasks. 53 * Each {@code ThreadPoolExecutor} also maintains some basic 54 * statistics, such as the number of completed tasks. 55 * 56 * <p>To be useful across a wide range of contexts, this class 57 * provides many adjustable parameters and extensibility 58 * hooks. However, programmers are urged to use the more convenient 59 * {@link Executors} factory methods {@link 60 * Executors#newCachedThreadPool} (unbounded thread pool, with 61 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 62 * (fixed size thread pool) and {@link 63 * Executors#newSingleThreadExecutor} (single background thread), that 64 * preconfigure settings for the most common usage 65 * scenarios. Otherwise, use the following guide when manually 66 * configuring and tuning this class: 67 * 68 * <dl> 69 * 70 * <dt>Core and maximum pool sizes</dt> 71 * 72 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the 73 * pool size (see {@link #getPoolSize}) 74 * according to the bounds set by 75 * corePoolSize (see {@link #getCorePoolSize}) and 76 * maximumPoolSize (see {@link #getMaximumPoolSize}). 77 * 78 * When a new task is submitted in method {@link #execute(Runnable)}, 79 * and fewer than corePoolSize threads are running, a new thread is 80 * created to handle the request, even if other worker threads are 81 * idle. If there are more than corePoolSize but less than 82 * maximumPoolSize threads running, a new thread will be created only 83 * if the queue is full. By setting corePoolSize and maximumPoolSize 84 * the same, you create a fixed-size thread pool. By setting 85 * maximumPoolSize to an essentially unbounded value such as {@code 86 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary 87 * number of concurrent tasks. Most typically, core and maximum pool 88 * sizes are set only upon construction, but they may also be changed 89 * dynamically using {@link #setCorePoolSize} and {@link 90 * #setMaximumPoolSize}. </dd> 91 * 92 * <dt>On-demand construction</dt> 93 * 94 * <dd>By default, even core threads are initially created and 95 * started only when new tasks arrive, but this can be overridden 96 * dynamically using method {@link #prestartCoreThread} or {@link 97 * #prestartAllCoreThreads}. You probably want to prestart threads if 98 * you construct the pool with a non-empty queue. </dd> 99 * 100 * <dt>Creating new threads</dt> 101 * 102 * <dd>New threads are created using a {@link ThreadFactory}. If not 103 * otherwise specified, a {@link Executors#defaultThreadFactory} is 104 * used, that creates threads to all be in the same {@link 105 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 106 * non-daemon status. By supplying a different ThreadFactory, you can 107 * alter the thread's name, thread group, priority, daemon status, 108 * etc. If a {@code ThreadFactory} fails to create a thread when asked 109 * by returning null from {@code newThread}, the executor will 110 * continue, but might not be able to execute any tasks. Threads 111 * should possess the "modifyThread" {@code RuntimePermission}. If 112 * worker threads or other threads using the pool do not possess this 113 * permission, service may be degraded: configuration changes may not 114 * take effect in a timely manner, and a shutdown pool may remain in a 115 * state in which termination is possible but not completed.</dd> 116 * 117 * <dt>Keep-alive times</dt> 118 * 119 * <dd>If the pool currently has more than corePoolSize threads, 120 * excess threads will be terminated if they have been idle for more 121 * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}). 122 * This provides a means of reducing resource consumption when the 123 * pool is not being actively used. If the pool becomes more active 124 * later, new threads will be constructed. This parameter can also be 125 * changed dynamically using method {@link #setKeepAliveTime(long, 126 * TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link 127 * TimeUnit#NANOSECONDS} effectively disables idle threads from ever 128 * terminating prior to shut down. By default, the keep-alive policy 129 * applies only when there are more than corePoolSize threads. But 130 * method {@link #allowCoreThreadTimeOut(boolean)} can be used to 131 * apply this time-out policy to core threads as well, so long as the 132 * keepAliveTime value is non-zero. </dd> 133 * 134 * <dt>Queuing</dt> 135 * 136 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 137 * submitted tasks. The use of this queue interacts with pool sizing: 138 * 139 * <ul> 140 * 141 * <li> If fewer than corePoolSize threads are running, the Executor 142 * always prefers adding a new thread 143 * rather than queuing.</li> 144 * 145 * <li> If corePoolSize or more threads are running, the Executor 146 * always prefers queuing a request rather than adding a new 147 * thread.</li> 148 * 149 * <li> If a request cannot be queued, a new thread is created unless 150 * this would exceed maximumPoolSize, in which case, the task will be 151 * rejected.</li> 152 * 153 * </ul> 154 * 155 * There are three general strategies for queuing: 156 * <ol> 157 * 158 * <li> <em> Direct handoffs.</em> A good default choice for a work 159 * queue is a {@link SynchronousQueue} that hands off tasks to threads 160 * without otherwise holding them. Here, an attempt to queue a task 161 * will fail if no threads are immediately available to run it, so a 162 * new thread will be constructed. This policy avoids lockups when 163 * handling sets of requests that might have internal dependencies. 164 * Direct handoffs generally require unbounded maximumPoolSizes to 165 * avoid rejection of new submitted tasks. This in turn admits the 166 * possibility of unbounded thread growth when commands continue to 167 * arrive on average faster than they can be processed. </li> 168 * 169 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 170 * example a {@link LinkedBlockingQueue} without a predefined 171 * capacity) will cause new tasks to wait in the queue when all 172 * corePoolSize threads are busy. Thus, no more than corePoolSize 173 * threads will ever be created. (And the value of the maximumPoolSize 174 * therefore doesn't have any effect.) This may be appropriate when 175 * each task is completely independent of others, so tasks cannot 176 * affect each others execution; for example, in a web page server. 177 * While this style of queuing can be useful in smoothing out 178 * transient bursts of requests, it admits the possibility of 179 * unbounded work queue growth when commands continue to arrive on 180 * average faster than they can be processed. </li> 181 * 182 * <li><em>Bounded queues.</em> A bounded queue (for example, an 183 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 184 * used with finite maximumPoolSizes, but can be more difficult to 185 * tune and control. Queue sizes and maximum pool sizes may be traded 186 * off for each other: Using large queues and small pools minimizes 187 * CPU usage, OS resources, and context-switching overhead, but can 188 * lead to artificially low throughput. If tasks frequently block (for 189 * example if they are I/O bound), a system may be able to schedule 190 * time for more threads than you otherwise allow. Use of small queues 191 * generally requires larger pool sizes, which keeps CPUs busier but 192 * may encounter unacceptable scheduling overhead, which also 193 * decreases throughput. </li> 194 * 195 * </ol> 196 * 197 * </dd> 198 * 199 * <dt>Rejected tasks</dt> 200 * 201 * <dd>New tasks submitted in method {@link #execute(Runnable)} will be 202 * <em>rejected</em> when the Executor has been shut down, and also when 203 * the Executor uses finite bounds for both maximum threads and work queue 204 * capacity, and is saturated. In either case, the {@code execute} method 205 * invokes the {@link 206 * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)} 207 * method of its {@link RejectedExecutionHandler}. Four predefined handler 208 * policies are provided: 209 * 210 * <ol> 211 * 212 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the 213 * handler throws a runtime {@link RejectedExecutionException} upon 214 * rejection. </li> 215 * 216 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 217 * that invokes {@code execute} itself runs the task. This provides a 218 * simple feedback control mechanism that will slow down the rate that 219 * new tasks are submitted. </li> 220 * 221 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 222 * cannot be executed is simply dropped. </li> 223 * 224 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 225 * executor is not shut down, the task at the head of the work queue 226 * is dropped, and then execution is retried (which can fail again, 227 * causing this to be repeated.) </li> 228 * 229 * </ol> 230 * 231 * It is possible to define and use other kinds of {@link 232 * RejectedExecutionHandler} classes. Doing so requires some care 233 * especially when policies are designed to work only under particular 234 * capacity or queuing policies. </dd> 235 * 236 * <dt>Hook methods</dt> 237 * 238 * <dd>This class provides {@code protected} overridable 239 * {@link #beforeExecute(Thread, Runnable)} and 240 * {@link #afterExecute(Runnable, Throwable)} methods that are called 241 * before and after execution of each task. These can be used to 242 * manipulate the execution environment; for example, reinitializing 243 * ThreadLocals, gathering statistics, or adding log entries. 244 * Additionally, method {@link #terminated} can be overridden to perform 245 * any special processing that needs to be done once the Executor has 246 * fully terminated. 247 * 248 * <p>If hook or callback methods throw exceptions, internal worker 249 * threads may in turn fail and abruptly terminate.</dd> 250 * 251 * <dt>Queue maintenance</dt> 252 * 253 * <dd>Method {@link #getQueue()} allows access to the work queue 254 * for purposes of monitoring and debugging. Use of this method for 255 * any other purpose is strongly discouraged. Two supplied methods, 256 * {@link #remove(Runnable)} and {@link #purge} are available to 257 * assist in storage reclamation when large numbers of queued tasks 258 * become cancelled.</dd> 259 * 260 * <dt>Finalization</dt> 261 * 262 * <dd>A pool that is no longer referenced in a program <em>AND</em> 263 * has no remaining threads will be {@code shutdown} automatically. If 264 * you would like to ensure that unreferenced pools are reclaimed even 265 * if users forget to call {@link #shutdown}, then you must arrange 266 * that unused threads eventually die, by setting appropriate 267 * keep-alive times, using a lower bound of zero core threads and/or 268 * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> 269 * 270 * </dl> 271 * 272 * <p><b>Extension example</b>. Most extensions of this class 273 * override one or more of the protected hook methods. For example, 274 * here is a subclass that adds a simple pause/resume feature: 275 * 276 * <pre> {@code 277 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 278 * private boolean isPaused; 279 * private ReentrantLock pauseLock = new ReentrantLock(); 280 * private Condition unpaused = pauseLock.newCondition(); 281 * 282 * public PausableThreadPoolExecutor(...) { super(...); } 283 * 284 * protected void beforeExecute(Thread t, Runnable r) { 285 * super.beforeExecute(t, r); 286 * pauseLock.lock(); 287 * try { 288 * while (isPaused) unpaused.await(); 289 * } catch (InterruptedException ie) { 290 * t.interrupt(); 291 * } finally { 292 * pauseLock.unlock(); 293 * } 294 * } 295 * 296 * public void pause() { 297 * pauseLock.lock(); 298 * try { 299 * isPaused = true; 300 * } finally { 301 * pauseLock.unlock(); 302 * } 303 * } 304 * 305 * public void resume() { 306 * pauseLock.lock(); 307 * try { 308 * isPaused = false; 309 * unpaused.signalAll(); 310 * } finally { 311 * pauseLock.unlock(); 312 * } 313 * } 314 * }}</pre> 315 * 316 * @since 1.5 317 * @author Doug Lea 318 */ 319 public class ThreadPoolExecutor extends AbstractExecutorService { 320 /** 321 * The main pool control state, ctl, is an atomic integer packing 322 * two conceptual fields 323 * workerCount, indicating the effective number of threads 324 * runState, indicating whether running, shutting down etc 325 * 326 * In order to pack them into one int, we limit workerCount to 327 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 328 * billion) otherwise representable. If this is ever an issue in 329 * the future, the variable can be changed to be an AtomicLong, 330 * and the shift/mask constants below adjusted. But until the need 331 * arises, this code is a bit faster and simpler using an int. 332 * 333 * The workerCount is the number of workers that have been 334 * permitted to start and not permitted to stop. The value may be 335 * transiently different from the actual number of live threads, 336 * for example when a ThreadFactory fails to create a thread when 337 * asked, and when exiting threads are still performing 338 * bookkeeping before terminating. The user-visible pool size is 339 * reported as the current size of the workers set. 340 * 341 * The runState provides the main lifecycle control, taking on values: 342 * 343 * RUNNING: Accept new tasks and process queued tasks 344 * SHUTDOWN: Don't accept new tasks, but process queued tasks 345 * STOP: Don't accept new tasks, don't process queued tasks, 346 * and interrupt in-progress tasks 347 * TIDYING: All tasks have terminated, workerCount is zero, 348 * the thread transitioning to state TIDYING 349 * will run the terminated() hook method 350 * TERMINATED: terminated() has completed 351 * 352 * The numerical order among these values matters, to allow 353 * ordered comparisons. The runState monotonically increases over 354 * time, but need not hit each state. The transitions are: 355 * 356 * RUNNING -> SHUTDOWN 357 * On invocation of shutdown(), perhaps implicitly in finalize() 358 * (RUNNING or SHUTDOWN) -> STOP 359 * On invocation of shutdownNow() 360 * SHUTDOWN -> TIDYING 361 * When both queue and pool are empty 362 * STOP -> TIDYING 363 * When pool is empty 364 * TIDYING -> TERMINATED 365 * When the terminated() hook method has completed 366 * 367 * Threads waiting in awaitTermination() will return when the 368 * state reaches TERMINATED. 369 * 370 * Detecting the transition from SHUTDOWN to TIDYING is less 371 * straightforward than you'd like because the queue may become 372 * empty after non-empty and vice versa during SHUTDOWN state, but 373 * we can only terminate if, after seeing that it is empty, we see 374 * that workerCount is 0 (which sometimes entails a recheck -- see 375 * below). 376 */ 377 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 378 private static final int COUNT_BITS = Integer.SIZE - 3; 379 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 380 381 // runState is stored in the high-order bits 382 private static final int RUNNING = -1 << COUNT_BITS; 383 private static final int SHUTDOWN = 0 << COUNT_BITS; 384 private static final int STOP = 1 << COUNT_BITS; 385 private static final int TIDYING = 2 << COUNT_BITS; 386 private static final int TERMINATED = 3 << COUNT_BITS; 387 388 // Packing and unpacking ctl 389 private static int runStateOf(int c) { return c & ~CAPACITY; } 390 private static int workerCountOf(int c) { return c & CAPACITY; } 391 private static int ctlOf(int rs, int wc) { return rs | wc; } 392 393 /* 394 * Bit field accessors that don't require unpacking ctl. 395 * These depend on the bit layout and on workerCount being never negative. 396 */ 397 398 private static boolean runStateLessThan(int c, int s) { 399 return c < s; 400 } 401 402 private static boolean runStateAtLeast(int c, int s) { 403 return c >= s; 404 } 405 406 private static boolean isRunning(int c) { 407 return c < SHUTDOWN; 408 } 409 410 /** 411 * Attempts to CAS-increment the workerCount field of ctl. 412 */ 413 private boolean compareAndIncrementWorkerCount(int expect) { 414 return ctl.compareAndSet(expect, expect + 1); 415 } 416 417 /** 418 * Attempts to CAS-decrement the workerCount field of ctl. 419 */ 420 private boolean compareAndDecrementWorkerCount(int expect) { 421 return ctl.compareAndSet(expect, expect - 1); 422 } 423 424 /** 425 * Decrements the workerCount field of ctl. This is called only on 426 * abrupt termination of a thread (see processWorkerExit). Other 427 * decrements are performed within getTask. 428 */ 429 private void decrementWorkerCount() { 430 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 431 } 432 433 /** 434 * The queue used for holding tasks and handing off to worker 435 * threads. We do not require that workQueue.poll() returning 436 * null necessarily means that workQueue.isEmpty(), so rely 437 * solely on isEmpty to see if the queue is empty (which we must 438 * do for example when deciding whether to transition from 439 * SHUTDOWN to TIDYING). This accommodates special-purpose 440 * queues such as DelayQueues for which poll() is allowed to 441 * return null even if it may later return non-null when delays 442 * expire. 443 */ 444 private final BlockingQueue<Runnable> workQueue; 445 446 /** 447 * Lock held on access to workers set and related bookkeeping. 448 * While we could use a concurrent set of some sort, it turns out 449 * to be generally preferable to use a lock. Among the reasons is 450 * that this serializes interruptIdleWorkers, which avoids 451 * unnecessary interrupt storms, especially during shutdown. 452 * Otherwise exiting threads would concurrently interrupt those 453 * that have not yet interrupted. It also simplifies some of the 454 * associated statistics bookkeeping of largestPoolSize etc. We 455 * also hold mainLock on shutdown and shutdownNow, for the sake of 456 * ensuring workers set is stable while separately checking 457 * permission to interrupt and actually interrupting. 458 */ 459 private final ReentrantLock mainLock = new ReentrantLock(); 460 461 /** 462 * Set containing all worker threads in pool. Accessed only when 463 * holding mainLock. 464 */ 465 private final HashSet<Worker> workers = new HashSet<Worker>(); 466 467 /** 468 * Wait condition to support awaitTermination 469 */ 470 private final Condition termination = mainLock.newCondition(); 471 472 /** 473 * Tracks largest attained pool size. Accessed only under 474 * mainLock. 475 */ 476 private int largestPoolSize; 477 478 /** 479 * Counter for completed tasks. Updated only on termination of 480 * worker threads. Accessed only under mainLock. 481 */ 482 private long completedTaskCount; 483 484 /* 485 * All user control parameters are declared as volatiles so that 486 * ongoing actions are based on freshest values, but without need 487 * for locking, since no internal invariants depend on them 488 * changing synchronously with respect to other actions. 489 */ 490 491 /** 492 * Factory for new threads. All threads are created using this 493 * factory (via method addWorker). All callers must be prepared 494 * for addWorker to fail, which may reflect a system or user's 495 * policy limiting the number of threads. Even though it is not 496 * treated as an error, failure to create threads may result in 497 * new tasks being rejected or existing ones remaining stuck in 498 * the queue. 499 * 500 * We go further and preserve pool invariants even in the face of 501 * errors such as OutOfMemoryError, that might be thrown while 502 * trying to create threads. Such errors are rather common due to 503 * the need to allocate a native stack in Thread.start, and users 504 * will want to perform clean pool shutdown to clean up. There 505 * will likely be enough memory available for the cleanup code to 506 * complete without encountering yet another OutOfMemoryError. 507 */ 508 private volatile ThreadFactory threadFactory; 509 510 /** 511 * Handler called when saturated or shutdown in execute. 512 */ 513 private volatile RejectedExecutionHandler handler; 514 515 /** 516 * Timeout in nanoseconds for idle threads waiting for work. 517 * Threads use this timeout when there are more than corePoolSize 518 * present or if allowCoreThreadTimeOut. Otherwise they wait 519 * forever for new work. 520 */ 521 private volatile long keepAliveTime; 522 523 /** 524 * If false (default), core threads stay alive even when idle. 525 * If true, core threads use keepAliveTime to time out waiting 526 * for work. 527 */ 528 private volatile boolean allowCoreThreadTimeOut; 529 530 /** 531 * Core pool size is the minimum number of workers to keep alive 532 * (and not allow to time out etc) unless allowCoreThreadTimeOut 533 * is set, in which case the minimum is zero. 534 */ 535 private volatile int corePoolSize; 536 537 /** 538 * Maximum pool size. Note that the actual maximum is internally 539 * bounded by CAPACITY. 540 */ 541 private volatile int maximumPoolSize; 542 543 /** 544 * The default rejected execution handler 545 */ 546 private static final RejectedExecutionHandler defaultHandler = 547 new AbortPolicy(); 548 549 /** 550 * Permission required for callers of shutdown and shutdownNow. 551 * We additionally require (see checkShutdownAccess) that callers 552 * have permission to actually interrupt threads in the worker set 553 * (as governed by Thread.interrupt, which relies on 554 * ThreadGroup.checkAccess, which in turn relies on 555 * SecurityManager.checkAccess). Shutdowns are attempted only if 556 * these checks pass. 557 * 558 * All actual invocations of Thread.interrupt (see 559 * interruptIdleWorkers and interruptWorkers) ignore 560 * SecurityExceptions, meaning that the attempted interrupts 561 * silently fail. In the case of shutdown, they should not fail 562 * unless the SecurityManager has inconsistent policies, sometimes 563 * allowing access to a thread and sometimes not. In such cases, 564 * failure to actually interrupt threads may disable or delay full 565 * termination. Other uses of interruptIdleWorkers are advisory, 566 * and failure to actually interrupt will merely delay response to 567 * configuration changes so is not handled exceptionally. 568 */ 569 private static final RuntimePermission shutdownPerm = 570 new RuntimePermission("modifyThread"); 571 572 /** 573 * Class Worker mainly maintains interrupt control state for 574 * threads running tasks, along with other minor bookkeeping. 575 * This class opportunistically extends AbstractQueuedSynchronizer 576 * to simplify acquiring and releasing a lock surrounding each 577 * task execution. This protects against interrupts that are 578 * intended to wake up a worker thread waiting for a task from 579 * instead interrupting a task being run. We implement a simple 580 * non-reentrant mutual exclusion lock rather than use 581 * ReentrantLock because we do not want worker tasks to be able to 582 * reacquire the lock when they invoke pool control methods like 583 * setCorePoolSize. Additionally, to suppress interrupts until 584 * the thread actually starts running tasks, we initialize lock 585 * state to a negative value, and clear it upon start (in 586 * runWorker). 587 */ 588 private final class Worker 589 extends AbstractQueuedSynchronizer 590 implements Runnable 591 { 592 /** 593 * This class will never be serialized, but we provide a 594 * serialVersionUID to suppress a javac warning. 595 */ 596 private static final long serialVersionUID = 6138294804551838833L; 597 598 /** Thread this worker is running in. Null if factory fails. */ 599 final Thread thread; 600 /** Initial task to run. Possibly null. */ 601 Runnable firstTask; 602 /** Per-thread task counter */ 603 volatile long completedTasks; 604 605 /** 606 * Creates with given first task and thread from ThreadFactory. 607 * @param firstTask the first task (null if none) 608 */ 609 Worker(Runnable firstTask) { 610 setState(-1); // inhibit interrupts until runWorker 611 this.firstTask = firstTask; 612 this.thread = getThreadFactory().newThread(this); 613 } 614 615 /** Delegates main run loop to outer runWorker */ 616 public void run() { 617 runWorker(this); 618 } 619 620 // Lock methods 621 // 622 // The value 0 represents the unlocked state. 623 // The value 1 represents the locked state. 624 625 protected boolean isHeldExclusively() { 626 return getState() != 0; 627 } 628 629 protected boolean tryAcquire(int unused) { 630 if (compareAndSetState(0, 1)) { 631 setExclusiveOwnerThread(Thread.currentThread()); 632 return true; 633 } 634 return false; 635 } 636 637 protected boolean tryRelease(int unused) { 638 setExclusiveOwnerThread(null); 639 setState(0); 640 return true; 641 } 642 643 public void lock() { acquire(1); } 644 public boolean tryLock() { return tryAcquire(1); } 645 public void unlock() { release(1); } 646 public boolean isLocked() { return isHeldExclusively(); } 647 648 void interruptIfStarted() { 649 Thread t; 650 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 651 try { 652 t.interrupt(); 653 } catch (SecurityException ignore) { 654 } 655 } 656 } 657 } 658 659 /* 660 * Methods for setting control state 661 */ 662 663 /** 664 * Transitions runState to given target, or leaves it alone if 665 * already at least the given target. 666 * 667 * @param targetState the desired state, either SHUTDOWN or STOP 668 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 669 */ 670 private void advanceRunState(int targetState) { 671 for (;;) { 672 int c = ctl.get(); 673 if (runStateAtLeast(c, targetState) || 674 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 675 break; 676 } 677 } 678 679 /** 680 * Transitions to TERMINATED state if either (SHUTDOWN and pool 681 * and queue empty) or (STOP and pool empty). If otherwise 682 * eligible to terminate but workerCount is nonzero, interrupts an 683 * idle worker to ensure that shutdown signals propagate. This 684 * method must be called following any action that might make 685 * termination possible -- reducing worker count or removing tasks 686 * from the queue during shutdown. The method is non-private to 687 * allow access from ScheduledThreadPoolExecutor. 688 */ 689 final void tryTerminate() { 690 for (;;) { 691 int c = ctl.get(); 692 if (isRunning(c) || 693 runStateAtLeast(c, TIDYING) || 694 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 695