線程池:對ThreadPoolExecutor的理解和源碼探索
對線程池的理解
在沒有引入線程池之前,如果去創建多線程,就會出現這幾種情況:第一,創建現場本身就占用CPU資源,給CPU帶來壓力;第二,線程本身也要占用內存空間,大量的線程會占用內存資源並且可能會導致Out of Memory。第三,線程調用結束後,大量的線程回收也會給GC帶來很大的壓力。第四,頻繁的創建和銷毀線程會降低系統的效率。
這個時候,線程池就應運而生,為了避免重復的創建線程,線程池的出現可以讓線程進行復用。
ThreadPoolExecutor的核心字段
private final BlockingQueue<Runnable> workQueue:一個阻塞隊列,用來存儲等待執行的任務,當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。通過workQueue,線程池實現了阻塞功能。
private final HashSet<Worker> workers:用來存儲在線程池中的所有Worker的集合。
private int largestPoolSize:線程池最大可實現的線程數量。
private volatile RejectedExecutionHandler handler:表示當拒絕處理任務時的策略,saturated 或者 shutdown被執行時被調用。
private volatile long keepAliveTime:表示線程沒有任務時最多保持多久然後停止。默認情況下,只有線程池中線程數大於corePoolSize 時,keepAliveTime 才會起作用。換句話說,當線程池中的線程數大於corePoolSize,並且一個線程空閑時間達到了keepAliveTime,那麽就是shutdown。
private volatile int corePoolSize:線程池核心線程數量。可以理解為允許線程池中允許同時運行的最大線程數。
private volatile int maximumPoolSize:線程池最大可容線程數量。
private final ReentrantLock mainLock:線程池可重入鎖。
private static final RejectedExecutionHandler defaultHandler:線程池拒絕策略。
線程池的狀態以及Bit值
private static final int RUNNING = -1 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
最核心的execute()方法
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn‘t, by returning false. *
如果線程池中的線程數量小於corePollSize,則線程池會嘗試去創建一個新的線程,並把這個command作為第一個任務來執行。
在調用addWorker方法時,會對runState和workerCount做原子性檢查。如果檢查失敗,addWorker方法會阻止添加線程,並返回false。
* 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. *
如果一個任務成功進入阻塞隊列,那麽我們需要進行一個雙重檢查來確保是我們已經添加一個線程(因為存在著一些線程在上次檢查後他已經死亡)
或者當我們進入該方法時,該線程池已經關閉。
所以,我們將重新檢查狀態,線程池關閉的情況下則回滾入隊列,線程池沒有線程的情況則創建一個新的線程。
* 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task.
如果任務無法入隊列(隊列滿了),那麽我們將嘗試新開啟一個線程(從corepoolsize到擴充到maximum),如果失敗了,那麽可以確定原因,要麽是
線程池關閉了或者飽和了(達到maximum),所以我們執行拒絕策略
*/
//AtomicInteger ctl,為原子操作類,在多線程下實現安全的自增自劍,排除了多線程下的可見性和指令重排序問題。 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))
//執行成功,則返回 return; c = ctl.get(); }
//檢查線程池狀態,如果是運行狀態,則進入阻塞隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
//進行到這裏有兩種情況,1,線程池狀態不是RUNNING,2,線程池workerCount >= corePoolSize並且workQueue已滿,需要擴容corePoolSize到maximumPoolSize else if (!addWorker(command, false)) reject(command); }
excute()方法執行的邏輯入下圖
現在看看addWorker方法的具體實現
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread#start), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { retry://外層循環,判斷線程池狀態 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //內層循環,主要是對worker數量加一 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //主要是把Runnable封裝到Worker中,並添加到WorkerSet集合中。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) {
//啟動線程 t.start(); workerStarted = true; } } } finally {
//添加失敗處理 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
線程池:對ThreadPoolExecutor的理解和源碼探索