1. 程式人生 > >執行緒池的工作原理與原始碼解讀及各常用執行緒池的執行流程圖

執行緒池的工作原理與原始碼解讀及各常用執行緒池的執行流程圖

有時候花了大把時間去看一些東西卻看不懂,是很 “ 藍瘦 ” 的,花時間也是投資。

本文適合:

  • 曾瞭解過執行緒池卻一直模模糊糊的人
  • 瞭解得差不多卻對某些點依然疑惑的

 

隨著cpu核數越來越多,不可避免的利用多執行緒技術以充分利用其計算能力。所以,多執行緒技術是服務端開發人員必須掌握的技術。

 

執行緒的建立和銷燬,都涉及到系統呼叫,比較消耗系統資源,所以就引入了執行緒池技術,避免頻繁的執行緒建立和銷燬。

 

在Java用有一個Executors工具類,可以為我們建立一個執行緒池,其本質就是new了一個ThreadPoolExecutor物件。執行緒池幾乎也是面試必考問題。本文結合原始碼,說說ThreadExecutor的工作原理。

 

一、執行緒池建立

先看一下ThreadPoolExecutor引數最全的構造方法:

①corePoolSize:執行緒池的核心執行緒數,說白了就是,即便是執行緒池裡沒有任何任務,也會有corePoolSize個執行緒在候著等任務。

②maximumPoolSize:最大執行緒數,不管你提交多少任務,執行緒池裡最多工作執行緒數就是maximumPoolSize。

③keepAliveTime:執行緒的存活時間。當執行緒池裡的執行緒數大於corePoolSize時,如果等了keepAliveTime時長還沒有任務可執行,則執行緒退出。

⑤unit:這個用來指定keepAliveTime的單位,比如秒:TimeUnit.SECONDS。

⑥workQueue:一個阻塞佇列,提交的任務將會被放到這個佇列裡。

⑦threadFactory:執行緒工廠,用來建立執行緒,主要是為了給執行緒起名字,預設工廠的執行緒名字:pool-1-thread-3。

⑧handler:拒絕策略,當執行緒池裡執行緒被耗盡,且佇列也滿了的時候會呼叫。

以上就是建立執行緒池時用到的引數,面試中經常會有面試官問道這個問題。

 

二、執行緒池執行流程

這裡用一個圖來說明執行緒池的執行流程

任務被提交到執行緒池,會先判斷當前執行緒數量是否小於corePoolSize,如果小於則建立執行緒來執行提交的任務,否則將任務放入workQueue佇列,如果workQueue滿了,則判斷當前執行緒數量是否小於maximumPoolSize,如果小於則建立執行緒執行任務,否則就會呼叫handler,以表示執行緒池拒絕接收任務。

這裡以jdk1.8.0_111的原始碼為例,看一下具體實現。

1、先看一下執行緒池的executor方法

①:判斷當前活躍執行緒數是否小於corePoolSize,如果小於,則呼叫addWorker建立執行緒執行任務

②:如果不小於corePoolSize,則將任務新增到workQueue佇列。

③:如果放入workQueue失敗,則建立執行緒執行任務,如果這時建立執行緒失敗(當前執行緒數不小於maximumPoolSize時),就會呼叫reject(內部呼叫handler)拒絕接受任務。

2、再看下addWorker的方法實現

這塊程式碼是在建立非核心執行緒時,即core等於false。判斷當前執行緒數是否大於等於maximumPoolSize,如果大於等於則返回false,即上邊說到的③中建立執行緒失敗的情況。

addWorker方法的下半部分:

①建立Worker物件,同時也會例項化一個Thread物件。

②啟動啟動這個執行緒

3、再到Worker裡看看其實現

可以看到在建立Worker時會呼叫threadFactory來建立一個執行緒。上邊的②中啟動一個執行緒就會觸發Worker的run方法被執行緒呼叫。

4、接下來咱們看看runWorker方法的邏輯

執行緒呼叫runWoker,會while迴圈呼叫getTask方法從workerQueue裡讀取任務,然後執行任務。只要getTask方法不返回null,此執行緒就不會退出。

5、最後在看看getTask方法實現

①咱們先不管allowCoreThreadTimeOut,這個變數預設值是false。wc>corePoolSize則是判斷當前執行緒數是否大於corePoolSize。

②如果當前執行緒數大於corePoolSize,則會呼叫workQueue的poll方法獲取任務,超時時間是keepAliveTime。如果超過keepAliveTime時長,poll返回了null,上邊提到的while循序就會退出,執行緒也就執行完了。

如果當前執行緒數小於corePoolSize,則會呼叫workQueue的take方法阻塞在當前。

 

補充:

下圖,皆可自行儲存,常常閱之。日久,根深蒂固

預設建構函式

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler
) {
    ....
}

絕對易懂的構造方法引數講解

引數名

作用

corePoolSize

佇列沒滿時,執行緒最大併發數

maximumPoolSizes

佇列滿後執行緒能夠達到的最大併發數

keepAliveTime

空閒執行緒過多久被回收的時間限制

unit

keepAliveTime 的時間單位

workQueue

阻塞的佇列型別

RejectedExecutionHandler

超出 maximumPoolSizes + workQueue 時,任務會交給RejectedExecutionHandler來處理

文字描述

corePoolSize,maximumPoolSize,workQueue之間關係。

  • 當執行緒池中執行緒數小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。
  • 當執行緒池中執行緒數達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行 。
  • 當workQueue已滿,且maximumPoolSize > corePoolSize時,新提交任務會建立新執行緒執行任務。
  • 當workQueue已滿,且提交任務數超過maximumPoolSize,任務由RejectedExecutionHandler處理。
  • 當執行緒池中執行緒數超過corePoolSize,且超過這部分的空閒時間達到keepAliveTime時,回收這些執行緒。
  • 當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize範圍內的執行緒空閒時間達到keepAliveTime也將回收。

一般流程圖

newFixedThreadPool 流程圖

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(
            nThreads,   // corePoolSize
            nThreads,   // maximumPoolSize == corePoolSize
            0L,         // 空閒時間限制是 0
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>() // 無界阻塞佇列
        );
}

newCacheThreadPool 流程圖

public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(
        0,                  // corePoolSoze == 0
        Integer.MAX_VALUE,  // maximumPoolSize 非常大
        60L,                // 空閒判定是60 秒
        TimeUnit.SECONDS,
        // 神奇的無儲存空間阻塞佇列,每個 put 必須要等待一個 take
        new SynchronousQueue<Runnable>()  
    );
}

newSingleThreadPool 流程圖

public static ExecutorService newSingleThreadExecutor() {
        return 
            new FinalizableDelegatedExecutorService
                (
                    new ThreadPoolExecutor
                        (
                            1,
                            1,
                            0L,
                            TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>(),
                            threadFactory
                        )
                );
    }

可以看到除了多了個 FinalizableDelegatedExecutorService 代理,其初始化和 newFiexdThreadPool 的 nThreads = 1 的時候是一樣的。 區別就在於:

  • newSingleThreadExecutor返回的ExcutorService在解構函式finalize()處會呼叫shutdown()
  • 如果我們沒有對它呼叫shutdown(),那麼可以確保它在被回收時呼叫shutdown()來終止執行緒。

使用ThreadFactory,可以改變執行緒的名稱、執行緒組、優先順序、守護程序狀態,一般採用預設。

流程圖略,請參考 newFiexdThreadPool,這裡不再累贅。

最後

還有一個定時任務執行緒池ScheduledThreadPool

它用來處理延時或定時任務,不常用