執行緒池的工作原理與原始碼解讀及各常用執行緒池的執行流程圖
有時候花了大把時間去看一些東西卻看不懂,是很 “ 藍瘦 ” 的,花時間也是投資。
本文適合:
- 曾瞭解過執行緒池卻一直模模糊糊的人
- 瞭解得差不多卻對某些點依然疑惑的
隨著cpu核數越來越多,不可避免的利用多執行緒技術以充分利用其計算能力。所以,多執行緒技術是服務端開發人員必須掌握的技術。
執行緒的建立和銷燬,都涉及到系統呼叫,比較消耗系統資源,所以就引入了執行緒池技術,避免頻繁的執行緒建立和銷燬。
在Java用有一個Executors工具類,可以為我們建立一個執行緒池,其本質就是new了一個ThreadPoolExecutor物件。執行緒池幾乎也是面試必考問題。本文結合原始碼,說說ThreadExecutor的工作原理。
一、執行緒池建立
先看一下ThreadPoolExecutor引數最全的構造方法:
①corePoolSize:執行緒池的核心執行緒數,說白了就是,即便是執行緒池裡沒有任何任務,也會有corePoolSize個執行緒在候著等任務。
②maximumPoolSize:最大執行緒數,不管你提交多少任務,執行緒池裡最多工作執行緒數就是maximumPoolSize。
③keepAliveTime:執行緒的存活時間。當執行緒池裡的執行緒數大於corePoolSize時,如果等了keepAliveTime時長還沒有任務可執行,則執行緒退出。
⑤unit:這個用來指定keepAliveTime的單位,比如秒:TimeUnit.SECONDS。
⑥workQueue:一個阻塞佇列,提交的任務將會被放到這個佇列裡。
⑦threadFactory:執行緒工廠,用來建立執行緒,主要是為了給執行緒起名字,預設工廠的執行緒名字:pool-1-thread-3。
⑧handler:拒絕策略,當執行緒池裡執行緒被耗盡,且佇列也滿了的時候會呼叫。
以上就是建立執行緒池時用到的引數,面試中經常會有面試官問道這個問題。
二、執行緒池執行流程
這裡用一個圖來說明執行緒池的執行流程
任務被提交到執行緒池,會先判斷當前執行緒數量是否小於corePoolSize,如果小於則建立執行緒來執行提交的任務,否則將任務放入workQueue佇列,如果workQueue滿了,則判斷當前執行緒數量是否小於maximumPoolSize,如果小於則建立執行緒執行任務,否則就會呼叫handler,以表示執行緒池拒絕接收任務。
這裡以jdk1.8.0_111的原始碼為例,看一下具體實現。
1、先看一下執行緒池的executor方法
①:判斷當前活躍執行緒數是否小於corePoolSize,如果小於,則呼叫addWorker建立執行緒執行任務
②:如果不小於corePoolSize,則將任務新增到workQueue佇列。
③:如果放入workQueue失敗,則建立執行緒執行任務,如果這時建立執行緒失敗(當前執行緒數不小於maximumPoolSize時),就會呼叫reject(內部呼叫handler)拒絕接受任務。
2、再看下addWorker的方法實現
這塊程式碼是在建立非核心執行緒時,即core等於false。判斷當前執行緒數是否大於等於maximumPoolSize,如果大於等於則返回false,即上邊說到的③中建立執行緒失敗的情況。
addWorker方法的下半部分:
①建立Worker物件,同時也會例項化一個Thread物件。
②啟動啟動這個執行緒
3、再到Worker裡看看其實現
可以看到在建立Worker時會呼叫threadFactory來建立一個執行緒。上邊的②中啟動一個執行緒就會觸發Worker的run方法被執行緒呼叫。
4、接下來咱們看看runWorker方法的邏輯
執行緒呼叫runWoker,會while迴圈呼叫getTask方法從workerQueue裡讀取任務,然後執行任務。只要getTask方法不返回null,此執行緒就不會退出。
5、最後在看看getTask方法實現
①咱們先不管allowCoreThreadTimeOut,這個變數預設值是false。wc>corePoolSize則是判斷當前執行緒數是否大於corePoolSize。
②如果當前執行緒數大於corePoolSize,則會呼叫workQueue的poll方法獲取任務,超時時間是keepAliveTime。如果超過keepAliveTime時長,poll返回了null,上邊提到的while循序就會退出,執行緒也就執行完了。
如果當前執行緒數小於corePoolSize,則會呼叫workQueue的take方法阻塞在當前。
補充:
下圖,皆可自行儲存,常常閱之。日久,根深蒂固
預設建構函式
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler ) { .... }
絕對易懂的構造方法引數講解
引數名 |
作用 |
---|---|
corePoolSize |
佇列沒滿時,執行緒最大併發數 |
maximumPoolSizes |
佇列滿後執行緒能夠達到的最大併發數 |
keepAliveTime |
空閒執行緒過多久被回收的時間限制 |
unit |
keepAliveTime 的時間單位 |
workQueue |
阻塞的佇列型別 |
RejectedExecutionHandler |
超出 maximumPoolSizes + workQueue 時,任務會交給RejectedExecutionHandler來處理 |
文字描述
corePoolSize,maximumPoolSize,workQueue之間關係。
- 當執行緒池中執行緒數小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。
- 當執行緒池中執行緒數達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行 。
- 當workQueue已滿,且maximumPoolSize > corePoolSize時,新提交任務會建立新執行緒執行任務。
- 當workQueue已滿,且提交任務數超過maximumPoolSize,任務由RejectedExecutionHandler處理。
- 當執行緒池中執行緒數超過corePoolSize,且超過這部分的空閒時間達到keepAliveTime時,回收這些執行緒。
- 當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize範圍內的執行緒空閒時間達到keepAliveTime也將回收。
一般流程圖
newFixedThreadPool 流程圖
public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor( nThreads, // corePoolSize nThreads, // maximumPoolSize == corePoolSize 0L, // 空閒時間限制是 0 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() // 無界阻塞佇列 ); }
newCacheThreadPool 流程圖
public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor( 0, // corePoolSoze == 0 Integer.MAX_VALUE, // maximumPoolSize 非常大 60L, // 空閒判定是60 秒 TimeUnit.SECONDS, // 神奇的無儲存空間阻塞佇列,每個 put 必須要等待一個 take new SynchronousQueue<Runnable>() ); }
newSingleThreadPool 流程圖
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor ( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory ) ); }
可以看到除了多了個 FinalizableDelegatedExecutorService
代理,其初始化和 newFiexdThreadPool
的 nThreads = 1 的時候是一樣的。 區別就在於:
- newSingleThreadExecutor返回的ExcutorService在解構函式finalize()處會呼叫shutdown()
- 如果我們沒有對它呼叫shutdown(),那麼可以確保它在被回收時呼叫shutdown()來終止執行緒。
使用ThreadFactory
,可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,一般採用預設。
流程圖略,請參考 newFiexdThreadPool,這裡不再累贅。
最後
還有一個定時任務執行緒池ScheduledThreadPool
它用來處理延時或定時任務,不常用