第十二章、執行緒池原理
一、為什麼要使用執行緒池
使用執行緒池主要有以下三個原因:
-
建立/銷燬執行緒需要消耗系統資源,執行緒池可以複用已建立的執行緒。
-
控制併發的數量。併發數量過多,可能會導致資源消耗過多,從而造成伺服器崩潰。(主要原因)
-
可以對執行緒做統一管理。
-
二、執行緒池的原理
Java中的執行緒池頂層介面是Executor
介面,ThreadPoolExecutor
是這個介面的實現類。
2.1、ThreadPoolExecutor提供的構造方法
一共有四個構造方法:
// 五個引數的建構函式 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) // 六個引數的建構函式-1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) // 六個引數的建構函式-2 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) // 七個引數的建構函式 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
涉及到5~7個引數,我們先看看必須的5個引數是什麼意思:
-
int corePoolSize:該執行緒池中核心執行緒數最大值
核心執行緒:執行緒池中有兩類執行緒,核心執行緒和非核心執行緒。核心執行緒預設情況下會一直存在於執行緒池中,即使這個核心執行緒什麼都不幹(鐵飯碗),而非核心執行緒如果長時間的閒置,就會被銷燬(臨時工)。
-
int maximumPoolSize:該執行緒池中執行緒總數最大值 。
該值等於核心執行緒數量 + 非核心執行緒數量。
-
long keepAliveTime:非核心執行緒閒置超時時長。
非核心執行緒如果處於閒置狀態超過該值,就會被銷燬。如果設定allowCoreThreadTimeOut(true),則會也作用於核心執行緒。
-
TimeUnit unit:keepAliveTime的單位。
TimeUnit是一個列舉型別 ,包括以下屬性:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小時 DAYS : 天
-
BlockingQueue workQueue:阻塞佇列,維護著等待執行的Runnable任務物件。
常用的幾個阻塞佇列:
-
LinkedBlockingQueue
鏈式阻塞佇列,底層資料結構是連結串列,預設大小是
Integer.MAX_VALUE
,也可以指定大小。 -
ArrayBlockingQueue
陣列阻塞佇列,底層資料結構是陣列,需要指定佇列的大小。
-
SynchronousQueue
同步佇列,內部容量為0,每個put操作必須等待一個take操作,反之亦然。
-
DelayQueue
延遲佇列,該佇列中的元素只有當其指定的延遲時間到了,才能夠從佇列中獲取到該元素 。
-
好了,介紹完5個必須的引數之後,還有兩個非必須的引數。
-
ThreadFactory threadFactory
建立執行緒的工廠 ,用於批量建立執行緒,統一在建立執行緒時設定一些引數,如是否守護執行緒、執行緒的優先順序等。如果不指定,會新建一個預設的執行緒工廠。
static class DefaultThreadFactory implements ThreadFactory { // 省略屬性 // 建構函式 DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } // 省略 }
-
RejectedExecutionHandler handler
拒絕處理策略,執行緒數量大於最大執行緒數就會採用拒絕處理策略,四種拒絕處理的策略為 :
-
ThreadPoolExecutor.AbortPolicy:預設拒絕處理策略,丟棄任務並丟擲RejectedExecutionException異常。
-
ThreadPoolExecutor.DiscardPolicy:丟棄新來的任務,但是不丟擲異常。
-
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列頭部(最舊的)的任務,然後重新嘗試執行程式(如果再次失敗,重複此過程)。
-
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務。
-
2.2、ThreadPoolExecutor的策略
執行緒池本身有一個排程執行緒,這個執行緒就是用於管理布控整個執行緒池裡的各種任務和事務,例如建立執行緒、銷燬執行緒、任務佇列管理、執行緒佇列管理等等。
故執行緒池也有自己的狀態。ThreadPoolExecutor
類中定義了一個volatile int
變數runState來表示執行緒池的狀態 ,分別為RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。
-
-
執行緒池建立後處於RUNNING狀態。
-
呼叫shutdown()方法後處於SHUTDOWN狀態,執行緒池不能接受新的任務,清除一些空閒worker,會等待阻塞佇列的任務完成。
-
呼叫shutdownNow()方法後處於STOP狀態,執行緒池不能接受新的任務,中斷所有執行緒,阻塞佇列中沒有被執行的任務全部丟棄。此時,poolsize=0,阻塞佇列的size也為0。
-
當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING狀態。接著會執行terminated()函式。
ThreadPoolExecutor中有一個控制狀態的屬性叫ctl,它是一個AtomicInteger型別的變數。
-
執行緒池處在TIDYING狀態時,執行完terminated()方法之後,就會由 TIDYING -> TERMINATED, 執行緒池被設定為TERMINATED狀態。
-
2.3、執行緒池主要的任務處理流程
處理任務的核心方法是execute
,我們看看 JDK 1.8 原始碼中ThreadPoolExecutor
是如何處理執行緒任務的:
// JDK 1.8 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.當前執行緒數小於corePoolSize,則呼叫addWorker建立核心執行緒執行任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果不小於corePoolSize,則將任務新增到workQueue佇列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 2.1 如果isRunning返回false(狀態檢查),則remove這個任務,然後執行拒絕策略。 if (! isRunning(recheck) && remove(command)) reject(command); // 2.2 執行緒池處於running狀態,但是沒有執行緒,則建立執行緒 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.如果放入workQueue失敗,則建立非核心執行緒執行任務, // 如果這時建立非核心執行緒失敗(當前執行緒總數不小於maximumPoolSize時),就會執行拒絕策略。 else if (!addWorker(command, false)) reject(command); }
為什麼要二次檢查執行緒池的狀態?
在多執行緒的環境下,執行緒池的狀態是時刻發生變化的。很有可能剛獲取執行緒池狀態後執行緒池狀態就改變了。判斷是否將command
加入workqueue
是執行緒池之前的狀態。倘若沒有二次檢查,萬一執行緒池處於非RUNNING狀態(在多執行緒環境下很有可能發生),那麼command
永遠不會執行。
總結一下處理流程
-
執行緒總數量 < corePoolSize,無論執行緒是否空閒,都會新建一個核心執行緒執行任務(讓核心執行緒數量快速達到corePoolSize,在核心執行緒數量 < corePoolSize時)。注意,這一步需要獲得全域性鎖。
-
執行緒總數量 >= corePoolSize時,新來的執行緒任務會進入任務佇列中等待,然後空閒的核心執行緒會依次去快取佇列中取任務來執行(體現了執行緒複用)。
-
當快取佇列滿了,說明這個時候任務已經多到爆棚,需要一些“臨時工”來執行這些任務了。於是會建立非核心執行緒去執行這個任務。注意,這一步需要獲得全域性鎖。
-
快取佇列滿了, 且匯流排程數達到了maximumPoolSize,則會採取上面提到的拒絕策略進行處理。
整個過程如圖所示:
2.4、ThreadPoolExecutor如何做到執行緒複用的?
我們知道,一個執行緒在建立的時候會指定一個執行緒任務,當執行完這個執行緒任務之後,執行緒自動銷燬。但是執行緒池卻可以複用執行緒,即一個執行緒執行完執行緒任務後不銷燬,繼續執行另外的執行緒任務。那麼,執行緒池如何做到執行緒複用呢?
原來,ThreadPoolExecutor在建立執行緒時,會將執行緒封裝成工作執行緒worker,並放入工作執行緒組中,然後這個worker反覆從阻塞佇列中拿任務去執行。話不多說,我們繼續看看原始碼(一定要仔細看,前後有聯絡)
這裡的addWorker
方法是在上面提到的execute
方法裡面呼叫的,先看看上半部分:
// ThreadPoolExecutor.addWorker方法原始碼上半部分 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || // 1.如果core是ture,證明需要建立的執行緒為核心執行緒,則先判斷當前執行緒是否大於核心執行緒 // 如果core是false,證明需要建立的是非核心執行緒,則先判斷當前執行緒數是否大於匯流排程數 // 如果不小於,則返回false wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
上半部分主要是判斷執行緒數量是否超出閾值,超過了就返回false。我們繼續看下半部分:
// ThreadPoolExecutor.addWorker方法原始碼下半部分 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 1.建立一個worker物件 w = new Worker(firstTask); // 2.例項化一個Thread物件 final Thread t = w.thread; if (t != null) { // 3.執行緒池全域性鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 4.啟動這個執行緒 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
建立worker
物件,並初始化一個Thread
物件,然後啟動這個執行緒物件。
我們接著看看Worker
類,僅展示部分原始碼:
// Worker類部分原始碼 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } //其餘程式碼略... }
Worker
類實現了Runnable
介面,所以Worker
也是一個執行緒任務。在構造方法中,建立了一個執行緒,執行緒的任務就是自己。故addWorker
方法呼叫addWorker方法原始碼下半部分中的第4步t.start
,會觸發Worker
類的run
方法被JVM呼叫。
我們再看看runWorker
的邏輯:
// Worker.runWorker方法原始碼 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 1.執行緒啟動之後,通過unlock方法釋放鎖 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 2.Worker執行firstTask或從workQueue中獲取任務,如果getTask方法不返回null,迴圈不退出 while (task != null || (task = getTask()) != null) { // 2.1進行加鎖操作,保證thread不被其他執行緒中斷(除非執行緒池被中斷) w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 2.2檢查執行緒池狀態,倘若執行緒池處於中斷狀態,當前執行緒將中斷。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 2.3執行beforeExecute beforeExecute(wt, task); Throwable thrown = null; try { // 2.4執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 2.5執行afterExecute方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 2.6解鎖操作 w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
首先去執行建立這個worker時就有的任務,當執行完這個任務後,worker的生命週期並沒有結束,在while
迴圈中,worker會不斷地呼叫getTask
方法從阻塞佇列中獲取任務然後呼叫task.run()
執行任務,從而達到複用執行緒的目的。只要getTask
方法不返回null
,此執行緒就不會退出。
當然,核心執行緒池中建立的執行緒想要拿到阻塞佇列中的任務,先要判斷執行緒池的狀態,如果STOP或者TERMINATED,返回null
。
最後看看getTask
方法的實現:
// Worker.getTask方法原始碼 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // 1.allowCoreThreadTimeOut變數預設是false,核心執行緒即使空閒也不會被銷燬 // 如果為true,核心執行緒在keepAliveTime內仍空閒則會被銷燬。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 2.如果執行執行緒數超過了最大執行緒數,但是快取佇列已經空了,這時遞減worker數量。 // 如果有設定允許執行緒超時或者執行緒數量超過了核心執行緒數量, // 並且執行緒在規定時間內均未poll到任務且佇列為空則遞減worker數量 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 3.如果timed為true(想想哪些情況下timed為true),則會呼叫workQueue的poll方法獲取任務. // 超時時間是keepAliveTime。如果超過keepAliveTime時長, // poll返回了null,上邊提到的while循序就會退出,執行緒也就執行完了。 // 如果timed為false(allowCoreThreadTimeOut為falsefalse // 且wc > corePoolSize為false),則會呼叫workQueue的take方法阻塞在當前。 // 佇列中有任務加入時,執行緒被喚醒,take方法返回任務,並執行。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
核心執行緒的會一直卡在workQueue.take
方法,被阻塞並掛起,不會佔用CPU資源,直到拿到Runnable
然後返回(當然如果allowCoreThreadTimeOut設定為true
,那麼核心執行緒就會去呼叫poll
方法,因為poll
可能會返回null
,所以這時候核心執行緒滿足超時條件也會被銷燬)。
非核心執行緒會workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超時還沒有拿到,下一次迴圈判斷compareAndDecrementWorkerCount就會返回null
,Worker物件的run()
方法迴圈體的判斷為null
,任務結束,然後執行緒被系統回收 。
原始碼解析完畢,你理解的原始碼是否和圖中的處理流程一致?如果不一致,那麼就多看兩遍吧,加油。
三、四種常見的執行緒池
Executors
類中提供的幾個靜態方法來建立執行緒池。大家到了這一步,如果看懂了前面講的ThreadPoolExecutor
構造方法中各種引數的意義,那麼一看到Executors
類中提供的執行緒池的原始碼就應該知道這個執行緒池是幹嘛的。
3.1、newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
CacheThreadPool
的執行流程如下:
-
提交任務進執行緒池。
-
因為corePoolSize為0的關係,不建立核心執行緒,執行緒池最大為Integer.MAX_VALUE。
-
嘗試將任務新增到SynchronousQueue佇列。
-
如果SynchronousQueue入列成功,等待被當前執行的執行緒空閒後拉取執行。如果當前沒有空閒執行緒,那麼就建立一個非核心執行緒,然後從SynchronousQueue拉取任務並在當前執行緒執行。
-
如果SynchronousQueue已有任務在等待,入列操作將會阻塞。
當需要執行很多短時間的任務時,CacheThreadPool的執行緒複用率比較高, 會顯著的提高效能。而且執行緒60s後會回收,意味著即使沒有任務進來,CacheThreadPool並不會佔用很多資源。
3.2、newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
核心執行緒數量和匯流排程數量相等,都是傳入的引數nThreads,所以只能建立核心執行緒,不能建立非核心執行緒。因為LinkedBlockingQueue的預設大小是Integer.MAX_VALUE,故如果核心執行緒空閒,則交給核心執行緒處理;如果核心執行緒不空閒,則入列等待,直到核心執行緒空閒。
與CachedThreadPool的區別:
-
因為 corePoolSize == maximumPoolSize ,所以FixedThreadPool只會建立核心執行緒。 而CachedThreadPool因為corePoolSize=0,所以只會建立非核心執行緒。
-
在 getTask() 方法,如果佇列裡沒有任務可取,執行緒會一直阻塞在 LinkedBlockingQueue.take() ,執行緒不會被回收。 CachedThreadPool會在60s後收回。
-
由於執行緒不會被回收,會一直卡在阻塞,所以沒有任務的情況下, FixedThreadPool佔用資源更多。
-
都幾乎不會觸發拒絕策略,但是原理不同。FixedThreadPool是因為阻塞佇列可以很大(最大為Integer最大值),故幾乎不會觸發拒絕策略;CachedThreadPool是因為執行緒池很大(最大為Integer最大值),幾乎不會導致執行緒數量大於最大執行緒數,故幾乎不會觸發拒絕策略。
3.3、newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
有且僅有一個核心執行緒( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不會建立非核心執行緒。所有任務按照先來先執行的順序執行。如果這個唯一的執行緒不空閒,那麼新來的任務會儲存在任務佇列裡等待執行。
3.4、newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //ScheduledThreadPoolExecutor(): public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
四種常見的執行緒池基本夠我們使用了,但是《阿里把把開發手冊》不建議我們直接使用Executors類中的執行緒池,而是通過ThreadPoolExecutor
的方式,這樣的處理方式讓寫的同學需要更加明確執行緒池的執行規則,規避資源耗盡的風險。
但如果你及團隊本身對執行緒池非常熟悉,又確定業務規模不會大到資源耗盡的程度(比如執行緒數量或任務佇列長度可能達到Integer.MAX_VALUE)時,其實是可以使用JDK提供的這幾個介面的,它能讓我們的程式碼具有更強的可讀性。