1. 程式人生 > >執行緒池沒你想的那麼簡單

執行緒池沒你想的那麼簡單

前言

原以為執行緒池還挺簡單的(平時常用,也分析過原理),這次是想自己動手寫一個執行緒池來更加深入的瞭解它;但在動手寫的過程中落地到細節時發現並沒想的那麼容易。結合原始碼對比後確實不得不佩服 Doug Lea

我覺得大部分人直接去看 java.util.concurrent.ThreadPoolExecutor 的原始碼時都是看一個大概,因為其中涉及到了許多細節處理,還有部分 AQS 的內容,所以想要理清楚具體細節並不是那麼容易。

與其挨個分析原始碼不如自己實現一個簡版,當然簡版並不意味著功能缺失,需要保證核心邏輯一致。

所以也是本篇文章的目的:

自己動手寫一個五臟俱全的執行緒池,同時會了解到執行緒池的工作原理,以及如何在工作中合理的利用執行緒池。

再開始之前建議對執行緒池不是很熟悉的朋友看看這幾篇:

這裡我截取了部分內容,也許可以埋個伏筆(坑)。


具體請看這兩個連結。

  • 如何優雅的使用和理解執行緒池
  • 執行緒池中你不容錯過的一些細節

由於篇幅限制,本次可能會分為上下兩篇。

建立執行緒池

現在進入正題,新建了一個 CustomThreadPool 類,它的工作原理如下:

簡單來說就是往執行緒池裡邊丟任務,丟的任務會緩衝到佇列裡;執行緒池裡儲存的其實就是一個個的 Thread ,他們會一直不停的從剛才緩衝的佇列裡獲取任務執行。

流程還是挺簡單。

先來看看我們這個自創的執行緒池的效果如何吧:

初始化了一個核心為3、最大執行緒數為5、佇列大小為 4 的執行緒池。

先往其中丟了 10 個任務,由於阻塞佇列的大小為 4 ,最大執行緒數為 5 ,所以由於佇列裡緩衝不了最終會建立 5 個執行緒(上限)。

過段時間沒有任務提交後(sleep)則會自動縮容到三個執行緒(保證不會小於核心執行緒數)。

建構函式

來看看具體是如何實現的。

下面則是這個執行緒池的建構函式:

會有以下幾個核心引數:

  • miniSize 最小執行緒數,等效於 ThreadPool 中的核心執行緒數。
  • maxSize 最大執行緒數。
  • keepAliveTime 執行緒保活時間。
  • workQueue 阻塞佇列。
  • notify 通知介面。

大致上都和 ThreadPool 中的引數相同,並且作用也是類似的。

需要注意的是其中初始化了一個 workers 成員變數:

    /**
     * 存放執行緒池
     */
    private volatile Set<Worker> workers;
    
    public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime,
                            TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) {
       
        workers = new ConcurrentHashSet<>();
    }

workers 是最終存放執行緒池中執行的執行緒,在 j.u.c 原始碼中是一個 HashSet 所以對他所有的操作都是需要加鎖。

我這裡為了簡便起見就自己定義了一個執行緒安全的 Set 稱為 ConcurrentHashSet

其實原理也非常簡單,和 HashSet 類似也是藉助於 HashMap 來存放資料,利用其 key 不可重複的特性來實現 set ,只是這裡的 HashMap 是用併發安全的 ConcurrentHashMap 來實現的。

這樣就能保證對它的寫入、刪除都是執行緒安全的。

不過由於 ConcurrentHashMapsize() 函式並不準確,所以我這裡單獨利用了一個 AtomicInteger 來統計容器大小。

建立核心執行緒

往執行緒池中丟一個任務的時候其實要做的事情還蠻多的,最重要的事情莫過於建立執行緒存放到執行緒池中了。

當然我們不能無限制的建立執行緒,不然拿執行緒池來就沒任何意義了。於是 miniSize maxSize 這兩個引數就有了它的意義。

但這兩個引數再哪一步的時候才起到作用呢?這就是首先需要明確的。

從這個流程圖可以看出第一步是需要判斷是否大於核心執行緒數,如果沒有則建立。


結合程式碼可以發現在執行任務的時候會判斷是否大於核心執行緒數,從而建立執行緒。

worker.startTask() 執行任務部分放到後面分析。

這裡的 miniSize 由於會在多執行緒場景下使用,所以也用 volatile 關鍵字來保證可見性。

佇列緩衝

結合上面的流程圖,第二步自然是要判斷佇列是否可以存放任務(是否已滿)。

優先會往佇列裡存放。

上至封頂

一旦寫入失敗則會判斷當前執行緒池的大小是否大於最大執行緒數,如果沒有則繼續建立執行緒執行。

不然則執行會嘗試阻塞寫入佇列(j.u.c 會在這裡執行拒絕策略)

以上的步驟和剛才那張流程圖是一樣的,這樣大家是否有看出什麼坑嘛?

時刻小心

從上面流程圖的這兩步可以看出會直接建立新的執行緒。

這個過程相對於中間直接寫入阻塞佇列的開銷是非常大的,主要有以下兩個原因:

  • 建立執行緒會加鎖,雖說最終用的是 ConcurrentHashMap 的寫入函式,但依然存在加鎖的可能。
  • 會建立新的執行緒,建立執行緒還需要呼叫作業系統的 API 開銷較大。

所以理想情況下我們應該避免這兩步,儘量讓丟入執行緒池中的任務進入阻塞佇列中。

執行任務

任務是新增進來了,那是如何執行的?

在建立任務的時候提到過 worker.startTask() 函式:

    /**
     * 新增任務,需要加鎖
     * @param runnable 任務
     */
    private void addWorker(Runnable runnable) {
        Worker worker = new Worker(runnable, true);
        worker.startTask();
        workers.add(worker);
    }

也就是在建立執行緒執行任務的時候會建立 Worker 物件,利用它的 startTask() 方法來執行任務。

所以先來看看 Worker 物件是長啥樣的:

其實他本身也是一個執行緒,將接收到需要執行的任務存放到成員變數 task 處。

而其中最為關鍵的則是執行任務 worker.startTask() 這一步驟。

    public void startTask() {
        thread.start();
    }

其實就是運行了 worker 執行緒自己,下面來看 run 方法。

  • 第一步是將建立執行緒時傳過來的任務執行(task.run),接著會一直不停的從佇列裡獲取任務執行,直到獲取不到新任務了。
  • 任務執行完畢後將內建的計數器 -1 ,方便後面任務全部執行完畢進行通知。
  • worker 執行緒獲取不到任務後退出,需要將自己從執行緒池中釋放掉(workers.remove(this))。

從佇列裡獲取任務

其實 getTask 也是非常關鍵的一個方法,它封裝了從佇列中獲取任務,同時對不需要保活的執行緒進行回收。

很明顯,核心作用就是從佇列裡獲取任務;但有兩個地方需要注意:

  • 當執行緒數超過核心執行緒數時,在獲取任務的時候需要通過保活時間從佇列裡獲取任務;一旦獲取不到任務則佇列肯定是空的,這樣返回 null 之後在上文的 run() 中就會退出這個執行緒;從而達到了回收執行緒的目的,也就是我們之前演示的效果
  • 這裡需要加鎖,加鎖的原因是這裡肯定會出現併發情況,不加鎖會導致 workers.size() > miniSize 條件多次執行,從而導致執行緒被全部回收完畢。

關閉執行緒池

最後來談談執行緒關閉的事;

還是以剛才那段測試程式碼為例,如果提交任務後我們沒有關閉執行緒,會發現即便是任務執行完畢後程序也不會退出。

從剛才的原始碼裡其實也很容易看出來,不退出的原因是 Worker 執行緒一定還會一直阻塞在 task = workQueue.take(); 處,即便是執行緒縮容了也不會小於核心執行緒數。

通過堆疊也能證明:

恰好剩下三個執行緒阻塞於此處。

而關閉執行緒通常又有以下兩種:

  • 立即關閉:執行關閉方法後不管現線上程池的執行狀況,直接一刀切全部停掉,這樣會導致任務丟失。
  • 不接受新的任務,同時等待現有任務執行完畢後退出線程池。

立即關閉

我們先來看第一種立即關閉

    /**
     * 立即關閉執行緒池,會造成任務丟失
     */
    public void shutDownNow() {
        isShutDown.set(true);
        tryClose(false);
    }
    
    /**
     * 關閉執行緒池
     *
     * @param isTry true 嘗試關閉      --> 會等待所有任務執行完畢
     *              false 立即關閉執行緒池--> 任務有丟失的可能
     */
    private void tryClose(boolean isTry) {
        if (!isTry) {
            closeAllTask();
        } else {
            if (isShutDown.get() && totalTask.get() == 0) {
                closeAllTask();
            }
        }

    }

    /**
     * 關閉所有任務
     */
    private void closeAllTask() {
        for (Worker worker : workers) {
            //LOGGER.info("開始關閉");
            worker.close();
        }
    }
    
    public void close() {
        thread.interrupt();
    }

很容易看出,最終就是遍歷執行緒池裡所有的 worker 執行緒挨個執行他們的中斷函式。

我們來測試一下:


可以發現後面丟進去的三個任務其實是沒有被執行的。

完事後關閉

而正常關閉則不一樣:

    /**
     * 任務執行完畢後關閉執行緒池
     */
    public void shutdown() {
        isShutDown.set(true);
        tryClose(true);
    }

他會在這裡多了一個判斷,需要所有任務都執行完畢之後才會去中斷執行緒。

同時線上程需要回收時都會嘗試關閉執行緒:


來看看實際效果:

回收執行緒

上文或多或少提到了執行緒回收的事情,其實總結就是以下兩點:

  • 一旦執行了 shutdown/shutdownNow 方法都會將執行緒池的狀態置為關閉狀態,這樣只要 worker 執行緒嘗試從佇列裡獲取任務時就會直接返回空,導致 worker 執行緒被回收。
  • 一旦執行緒池大小超過了核心執行緒數就會使用保活時間來從佇列裡獲取任務,所以一旦獲取不到返回 null 時就會觸發回收。

但如果我們的佇列足夠大,導致執行緒數都不會超過核心執行緒數,這樣是不會觸發回收的。

比如這裡我將佇列大小調為 10 ,這樣任務就會累計在佇列裡,不會建立五個 worker 執行緒。

所以一直都是 Thread-1~3 這三個執行緒在反覆排程任務。

總結

本次實現了執行緒池裡大部分核心功能,我相信只要看完並動手敲一遍一定會對執行緒池有不一樣的理解。

結合目前的內容來總結下:

  • 執行緒池、佇列大小要設計的合理,儘量的讓任務從佇列中獲取執行。
  • 慎用 shutdownNow() 方法關閉執行緒池,會導致任務丟失(除非業務允許)。
  • 如果任務多,執行緒執行時間短可以調大 keepalive 值,使得執行緒儘量不被回收從而可以複用執行緒。

同時下次會分享一些執行緒池的新特性,如:

  • 執行帶有返回值的執行緒。
  • 異常處理怎麼辦?
  • 所有任務執行完怎麼通知我?

本文所有原始碼:

https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/CustomThreadPool.java

你的點贊與分享是對我最大的支援