1. 程式人生 > >併發程式設計 —— 執行緒池

併發程式設計 —— 執行緒池

概述

在程式中,我們會用各種池化技術來快取建立昂貴的物件,比如執行緒池、連線池、記憶體池。一般是預先建立一些物件放入池中,使用的時候直接取出使用,用完歸還以便複用,還會通過一定的策略調整池中快取物件的數量,實現池的動態伸縮。

由於執行緒的建立比較昂貴,隨意、沒有控制地建立大量執行緒會造成效能問題,因此短平快的任務一般考慮使用執行緒池來處理,而不是直接建立執行緒。

那麼,如何正確的建立並正確的使用執行緒池呢,這篇文章就來細看下。

執行緒池

雖然在 Java 語言中建立執行緒看上去就像建立一個物件一樣簡單,只需要 new Thread() 就可以了,但實際上建立執行緒遠不是建立一個物件那麼簡單。

建立物件,僅僅是在 JVM 的堆裡分配一塊記憶體而已;而建立一個執行緒,卻需要呼叫作業系統核心的 API,然後作業系統要為執行緒分配一系列的資源,這個成本就很高了。所以執行緒是一個重量級的物件,應該避免頻繁建立和銷燬,一般就是採用執行緒池來避免頻繁的建立和銷燬執行緒。

 

執行緒池原理

Java 通過使用者執行緒與核心執行緒結合的 1:1 執行緒模型來實現,Java 將執行緒的排程和管理設定在了使用者態。在 HotSpot VM 的執行緒模型中,Java 執行緒被一對一對映為核心執行緒。Java 在使用執行緒執行程式時,需要建立一個核心執行緒;當該 Java 執行緒被終止時,這個核心執行緒也會被回收。因此 Java 執行緒的建立與銷燬將會消耗一定的計算機資源,從而增加系統的效能開銷。

除此之外,大量建立執行緒同樣會給系統帶來效能問題,因為記憶體和 CPU 資源都將被執行緒搶佔,如果處理不當,就會發生記憶體溢位、CPU 使用率超負荷等問題。

為了解決上述兩類問題,Java 提供了執行緒池概念,對於頻繁建立執行緒的業務場景,執行緒池可以建立固定的執行緒數量,並且在作業系統底層,輕量級程序將會把這些執行緒對映到核心。

執行緒池可以提高執行緒複用,又可以固定最大執行緒使用量,防止無限制地建立執行緒。當程式提交一個任務需要一個執行緒時,會去執行緒池中查詢是否有空閒的執行緒,若有,則直接使用執行緒池中的執行緒工作,若沒有,會去判斷當前已建立的執行緒數量是否超過最大執行緒數量,如未超過,則建立新執行緒,如已超過,則進行排隊等待或者直接丟擲異常。

 

執行緒池是一種生產者 - 消費者模式

執行緒池的設計,普遍採用的都是生產者 - 消費者模式。執行緒池的使用方是生產者,執行緒池本身是消費者。

原理實現大致如下:

 1 package com.lyyzoo.test.concurrent.executor;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.BlockingQueue;
 6 import java.util.concurrent.LinkedBlockingQueue;
 7 
 8 /**
 9  * @author bojiangzhou 2020/02/12
10  */
11 public class CustomThreadPool {
12 
13     public static void main(String[] args) {
14         // 使用有界阻塞佇列 建立執行緒池
15         CustomThreadPool pool = new CustomThreadPool(2, new LinkedBlockingQueue<>(10));
16         pool.execute(() -> {
17             System.out.println("提交了一個任務");
18         });
19     }
20 
21     // 利用阻塞佇列實現生產者-消費者模式
22     final BlockingQueue<Runnable> workQueue;
23     // 儲存內部工作執行緒
24     final List<Thread> threads = new ArrayList<>();
25 
26     public CustomThreadPool(int coreSize, BlockingQueue<Runnable> workQueue) {
27         this.workQueue = workQueue;
28         // 建立工作執行緒
29         for (int i = 0; i < coreSize; i++) {
30             WorkerThread work = new WorkerThread();
31             work.start();
32             threads.add(work);
33         }
34     }
35 
36     // 生產者 提交任務
37     public void execute(Runnable command) {
38         try {
39             // 佇列已滿,put 會一直等待
40             workQueue.put(command);
41         } catch (InterruptedException e) {
42             e.printStackTrace();
43         }
44     }
45 
46     /**
47      * 工作執行緒負責消費任務,並執行任務
48      */
49     class WorkerThread extends Thread {
50         @Override
51         public void run() {
52             // 迴圈取任務並執行,take 取不到任務會一直等待
53             while (true) {
54                 try {
55                     Runnable runnable = workQueue.take();
56                     runnable.run();
57                 } catch (InterruptedException e) {
58                     e.printStackTrace();
59                 }
60             }
61         }
62     }
63 }

ThreadPoolExecutor

執行緒池引數說明

Java 提供的執行緒池相關的工具類中,最核心的是 ThreadPoolExecutor,通過名字也能看出來,它強調的是 Executor,而不是一般意義上的池化資源。

ThreadPoolExecutor 的建構函式非常複雜,這個最完備的建構函式有 7 個引數:

 

各個引數的含義如下:

  • corePoolSize:表示執行緒池保有的最小執行緒數。
  • maximumPoolSize:表示執行緒池建立的最大執行緒數。
  • keepAliveTime & unit:如果一個執行緒空閒了 keepAliveTime & unit 這麼久,而且執行緒池的執行緒數大於 corePoolSize ,那麼這個空閒的執行緒就要被回收了。
  • workQueue:工作佇列,一般定義有界阻塞佇列。
  • threadFactory:通過這個引數你可以自定義如何建立執行緒,例如你可以給執行緒指定一個有意義的名字。
  • handler:通過這個引數可以自定義任務的拒絕策略。如果執行緒池中所有的執行緒都在忙碌,並且工作佇列也滿了(前提是工作佇列是有界佇列),那麼此時提交任務,執行緒池就會拒絕接收。ThreadPoolExecutor 已經提供了以下 4 種拒絕策略。
    •   CallerRunsPolicy:提交任務的執行緒自己去執行該任務。
    •   AbortPolicy:預設的拒絕策略,會 throws RejectedExecutionException。
    •   DiscardPolicy:直接丟棄任務,沒有任何異常丟擲。
    •   DiscardOldestPolicy:丟棄最老的任務,其實就是把最早進入工作佇列的任務丟棄,然後把新任務加入到工作佇列。

 

ThreadPoolExecutor 構造完成後,還可以通過如下方法定製預設行為:

  • executor.allowCoreThreadTimeOut(true):將包括“核心執行緒”在內的,沒有任務分配的所有執行緒,在等待 keepAliveTime 時間後回收掉。
  • executor.prestartAllCoreThreads():建立執行緒池後,立即建立核心數個工作執行緒;執行緒池預設是在任務來時才建立工作執行緒。

 

建立執行緒池示例:

 1 public void test() throws InterruptedException {
 2     ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
 3             // 核心執行緒數
 4             2,
 5             // 最大執行緒數
 6             16,
 7             // 執行緒空閒時間
 8             60, TimeUnit.SECONDS,
 9             // 使用有界阻塞佇列
10             new LinkedBlockingQueue<>(1024),
11             // 定義執行緒建立方式,可自定執行緒名稱
12             new ThreadFactoryBuilder().setNameFormat("executor-%d").build(),
13             // 自定義拒絕策略,一般和降級策略配合使用
14             (r, executor) -> {
15                 // 佇列已滿,拒絕執行
16                 throw new RejectedExecutionException("Task " + r.toString() +
17                         " rejected from " + executor.toString());
18             }
19     );
20 
21     poolExecutor.submit(() -> {
22         LOGGER.info("submit task");
23     });
24 }

 

執行緒池的執行緒分配流程

任務提交後的大致流程如下圖所示。提交任務後,如果執行緒數小於 corePoolSize,則建立新執行緒執行任務,無論當前執行緒池的執行緒是否空閒都會建立新的執行緒。

當建立的執行緒數等於 corePoolSize 時,提交的任務會被加入到設定的阻塞佇列中。

當佇列滿了,則會建立非核心執行緒執行任務,直到執行緒池中的執行緒數量等於 maximumPoolSize。

當執行緒數量已經等於 maximumPoolSize 時, 新提交的任務無法加入到等待佇列,也無法建立非核心執行緒直接執行,如果沒有為執行緒池設定拒絕策略,這時執行緒池就會丟擲 RejectedExecutionException 異常,即預設拒絕接受任務。

 

執行緒池預設的拒絕策略就是丟棄任務,所以我們在設定有界佇列時,需要考慮設定合理的拒絕策略,要考慮到高峰時期任務的數量,避免任務被丟棄而影響業務流程。

 

強烈建議使用有界佇列

建立 ThreadPoolExecutor 時強烈建議使用有界佇列。如果設定為無界佇列,那麼一般最大執行緒數的設定是不起作用的,而且遇到任務高峰時,如果一直往佇列新增任務,容易出現OOM,丟擲如下異常。

Exception in thread "http-nio-45678-ClientPoller" 
    java.lang.OutOfMemoryError: GC overhead limit exceeded

 

使用有界佇列時,需要注意,當任務過多時,執行緒池會觸發執行拒絕策略,執行緒池預設的拒絕策略會丟擲 RejectedExecutionException,這是個執行時異常,對於執行時異常編譯器並不強制 catch 它,所以開發人員很容易忽略,因此預設拒絕策略要慎重使用。如果執行緒池處理的任務非常重要,建議自定義自己的拒絕策略;並且在實際工作中,自定義的拒絕策略往往和降級策略配合使用。

 

監控執行緒池的狀態

建議用一些監控手段來觀察執行緒池的狀態。執行緒池這個元件往往會表現得任勞任怨、默默無聞,除非是出現了拒絕策略,否則壓力再大都不會丟擲一個異常。如果我們能提前觀察到執行緒池佇列的積壓,或者執行緒數量的快速膨脹,往往可以提早發現並解決問題。

 1 public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName, long period, TimeUnit unit) {
 2     Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
 3         LOGGER.info("[>>ExecutorStatus<<] ThreadPool Name: [{}], Pool Status: [shutdown={}, Terminated={}], Pool Thread Size: {}, Active Thread Count: {}, Task Count: {}, Tasks Completed: {}, Tasks in Queue: {}",
 4                 threadPoolName,
 5                 threadPool.isShutdown(), threadPool.isTerminated(), // 執行緒是否被終止
 6                 threadPool.getPoolSize(), // 執行緒池執行緒數量
 7                 threadPool.getActiveCount(), // 工作執行緒數
 8                 threadPool.getTaskCount(), // 總任務數
 9                 threadPool.getCompletedTaskCount(), // 已完成的任務數
10                 threadPool.getQueue().size()); // 執行緒池中執行緒的數量
11     }, 0, period, unit);
12 }

執行緒池任務提交方式

提交任務可以通過 execute 和 submit 方法提交任務,下面就來看下它們的區別。

submit 方法簽名:

execute 方法簽名:

 

使用 execute 提交任務

使用 execute 提交任務,執行緒池內丟擲異常會導致執行緒退出,執行緒池只能重新建立一個執行緒。如果每個非同步任務都以異常結束,那麼執行緒池可能完全起不到執行緒重用的作用。

而且主執行緒無法捕獲(catch)到執行緒池內丟擲的異常。因為沒有手動捕獲異常進行處理,ThreadGroup 幫我們進行了未捕獲異常的預設處理,向標準錯誤輸出列印了出現異常的執行緒名稱和異常資訊。顯然,這種沒有以統一的錯誤日誌格式記錄錯誤資訊打印出來的形式,對生產級程式碼是不合適的。

 

如下,execute 提交任務,丟擲異常後,從執行緒名稱可以看出,老執行緒退出,建立了新的執行緒。

ThreadGroup 處理未捕獲異常:直接輸出到 System.err

 

解決方式:

  • 以 execute 方法提交到執行緒池的非同步任務,最好在任務內部做好異常處理;
  • 設定自定義的異常處理程式作為保底,比如在宣告執行緒池時自定義執行緒池的未捕獲異常處理程式。或者設定全域性的預設未捕獲異常處理程式。
 1 // 自定義執行緒池的未捕獲異常處理程式
 2 ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 8,
 3         30, TimeUnit.MINUTES,
 4         new LinkedBlockingQueue<>(),
 5         new ThreadFactoryBuilder()
 6                 .setNameFormat("pool-%d")
 7                 .setUncaughtExceptionHandler((Thread t, Throwable e) -> {
 8                     log.error("pool happen exception, thread is {}", t, e);
 9                 })
10                 .build());
11                 
12 // 設定全域性的預設未捕獲異常處理程式
13 static {
14     Thread.setDefaultUncaughtExceptionHandler((thread, throwable)-> {
15         log.error("Thread {} got exception", thread, throwable)
16     });
17 }  

定義的異常處理程式將未捕獲的異常資訊列印到標準日誌中了,老執行緒同樣會退出。如果要避免這個問題,就需要使用 submit 方法提交任務。

 

使用 submit 提交任務

使用 submit,執行緒不會退出,但是異常不會記錄,會被生吞掉。檢視 FutureTask 原始碼可以發現,在執行任務出現異常之後,異常存到了一個 outcome 欄位中,只有在呼叫 get 方法獲取 FutureTask 結果的時候,才會以 ExecutionException 的形式重新丟擲異常。所以我們可以通過捕獲 get 方法丟擲的異常來判斷執行緒的任務是否丟擲了異常。

 

submit 提交任務,可以通過 Future 獲取返回結果,如果丟擲異常,可以捕獲 ExecutionException 得到異常棧資訊。通過執行緒名稱可以看出,老執行緒也沒有退出。

需要注意的是,使用 submit 時,setUncaughtExceptionHandler 設定的異常處理器不會生效。

 

submit 與 execute 的區別

execute提交的是Runnable型別的任務,而submit提交的是Callable或者Runnable型別的任務;

execute的提交沒有返回值,而submit的提交會返回一個Future型別的物件;

execute提交的時候,如果有異常,就會直接丟擲異常,而submit在遇到異常的時候,通常不會立馬丟擲異常,而是會將異常暫時儲存起來,等待你呼叫Future.get()方法的時候,才會丟擲異常;

execute 提交的任務丟擲異常,老執行緒會退出,執行緒池會立即建立一個新的執行緒。submit 提交的任務丟擲異常,老執行緒不會退出;

執行緒池設定的 UncaughtExceptionHandler 對 execute 提交的任務生效,對 submit 提交的任務不生效。

執行緒數設定多少合適

建立多少執行緒合適,要看多執行緒具體的應用場景。我們的程式一般都是 CPU 計算和 I/O 操作交叉執行的,由於 I/O 裝置的速度相對於 CPU 來說都很慢,所以大部分情況下,I/O 操作執行的時間相對於 CPU 計算來說都非常長,這種場景我們一般都稱為 I/O 密集型計算;和 I/O 密集型計算相對的就是 CPU 密集型計算了,CPU 密集型計算大部分場景下都是純 CPU 計算。I/O 密集型程式和 CPU 密集型程式,計算最佳執行緒數的方法是不同的。

 

CPU 密集型計算

多執行緒本質上是提升多核 CPU 的利用率,所以對於一個 4 核的 CPU,每個核一個執行緒,理論上建立 4 個執行緒就可以了,再多建立執行緒也只是增加執行緒切換的成本。所以,對於 CPU 密集型的計算場景,理論上“執行緒的數量 = CPU 核數”就是最合適的。不過在工程上,執行緒的數量一般會設定為“CPU 核數 +1”,這樣的話,當執行緒因為偶爾的記憶體頁失效或其他原因導致阻塞時,這個額外的執行緒可以頂上,從而保證 CPU 的利用率。

 

I/O 密集型的計算場景

如果 CPU 計算和 I/O 操作的耗時是 1:1,那麼 2 個執行緒是最合適的。如果 CPU 計算和 I/O 操作的耗時是 1:2,那設定 3 個執行緒是合適的,如下圖所示:CPU 在 A、B、C 三個執行緒之間切換,對於執行緒 A,當 CPU 從 B、C 切換回來時,執行緒 A 正好執行完 I/O 操作。這樣 CPU 和 I/O 裝置的利用率都達到了 100%。

會發現,對於 I/O 密集型計算場景,最佳的執行緒數是與程式中 CPU 計算和 I/O 操作的耗時比相關的,可以總結出這樣一個公式:最佳執行緒數 =1 +(I/O 耗時 / CPU 耗時)

對於多核 CPU,需要等比擴大,計算公式如下:最佳執行緒數 =CPU 核數 * [ 1 +(I/O 耗時 / CPU 耗時)]

 

執行緒池執行緒數設定 

可通過如下方式獲取CPU核數:

1 /**
2  * 獲取返回CPU核數
3  *
4  * @return 返回CPU核數,預設為8
5  */
6 public static int getCpuProcessors() {
7     return Runtime.getRuntime() != null && Runtime.getRuntime().availableProcessors() > 0 ?
8             Runtime.getRuntime().availableProcessors() : 8;
9 }

 

在一些非核心業務中,我們可以將核心執行緒數設定小一些,最大執行緒數量設定為CPU核心數量,阻塞佇列大小根據具體場景設定;不要過大,防止大量任務進入等待佇列而超時,應儘快建立非核心執行緒執行任務;也不要過小,避免佇列滿了任務被拒絕丟棄。

 1 public ThreadPoolExecutor executor() {
 2     int coreSize = getCpuProcessors();
 3     ThreadPoolExecutor executor = new ThreadPoolExecutor(
 4             2, coreSize,
 5             10, TimeUnit.MINUTES,
 6             new LinkedBlockingQueue<>(512),
 7             new ThreadFactoryBuilder().setNameFormat("executor-%d").build(),
10             new ThreadPoolExecutor.AbortPolicy()
11     );14 
15     return executor;
16 }

 

在一些核心業務中,核心執行緒數設定為CPU核心數,最大執行緒數可根據公式 最佳執行緒數 =CPU 核數 * [ 1 +(I/O 耗時 / CPU 耗時)] 來計算。阻塞佇列可以根據具體業務場景設定,如果執行緒處理業務非常迅速,我們可以考慮將阻塞佇列設定大一些,處理的請求吞吐量會大些;如果執行緒處理業務非常耗時,阻塞佇列設定小些,防止請求在阻塞佇列中等待過長時間而導致請求已超時。

public ThreadPoolExecutor executor() {
    int coreSize = getCpuProcessors();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            coreSize, coreSize * 8,
            30, TimeUnit.MINUTES,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder().setNameFormat("executor-%d").build(),
            new ThreadPoolExecutor.AbortPolicy()
    );return executor;
}

 

注意:一般不要將 corePoolSize 設定為 0,例如下面的執行緒池,使用了無界佇列,雖 maximumPoolSize > 0,但實際上只會有一個工作執行緒,因為其它任務都加入等待隊列了。

1 ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 30, TimeUnit.SECONDS,
3         new LinkedBlockingQueue<>(),
4         new ThreadFactoryBuilder().setNameFormat("test-%d").build()
5 );

 

執行緒池如何優先啟用非核心執行緒

如果想讓執行緒池激進一點,優先開啟更多的執行緒,而把隊列當成一個後備方案,可以自定義佇列,重寫 offer 方法,因為執行緒池是通過 offer 方法將任務放入佇列。

 

通過重寫佇列的 offer 方法,直接返回 false,造成這個佇列已滿的假象,執行緒池在工作佇列滿了無法入隊的情況下會擴容執行緒池。直到執行緒數達到最大執行緒數,就會觸發拒絕策略,此時再通過自定義的拒絕策略將任務通過佇列的 put 方法放入佇列中。這樣就可以優先開啟更多執行緒,而不是進入隊列了。

 1 public static void main(String[] args) {
 2     // ThreadPoolExecutor 通過 offer 將元素放入佇列,過載佇列的 offer 方法,直接返回 false,造成佇列已滿的假象
 3     // 佇列滿時,會建立新的執行緒直到達到 maximumPoolSize,之後會觸發執行拒絕策略
 4     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
 5         private static final long serialVersionUID = 8303142475890427046L;
 6 
 7         @Override
 8         public boolean offer(Runnable e) {
 9             return false;
10         }
11     };
12 
13     // 當執行緒達到 maximumPoolSize 時會觸發拒絕策略,此時將任務 put 到佇列中
14     RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
15         @Override
16         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
17             try {
18                 // 任務拒絕時,通過 put 放入佇列
19                 queue.put(r);
20             } catch (InterruptedException e) {
21                 Thread.currentThread().interrupt();
22             }
23         }
24     };
25 
26     // 構造執行緒池
27     ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4,
28             600, TimeUnit.SECONDS,
29             queue,
30             new ThreadFactoryBuilder().setNameFormat("demo-%d").build(),
31             rejectedExecutionHandler);
32 
33     IntStream.rangeClosed(1, 50).forEach(i -> {
34         executor.submit(() -> {
35             log.info("start...");
36             sleep(9000);
37         });
38     });
39 }

優雅的終止執行緒和執行緒池

優雅地終止執行緒

在程式中,我們不能隨便中斷一個執行緒,因為這是極其不安全的操作,我們無法知道這個執行緒正執行在什麼狀態,它可能持有某把鎖,強行中斷可能導致鎖不能釋放的問題;或者執行緒可能在操作資料庫,強行中斷導致資料不一致混亂的問題。正因此,JAVA裡將Thread的stop方法設定為過時,以禁止大家使用。

優雅地終止執行緒,不是自己終止自己,而是在一個執行緒 T1 中,終止執行緒 T2;這裡所謂的“優雅”,指的是給 T2 一個機會料理後事,而不是被一劍封喉。兩階段終止模式,就是將終止過程分成兩個階段,其中第一個階段主要是執行緒 T1 向執行緒 T2傳送終止指令,而第二階段則是執行緒 T2響應終止指令。

Java 執行緒進入終止狀態的前提是執行緒進入 RUNNABLE 狀態,而實際上執行緒也可能處在休眠狀態,也就是說,我們要想終止一個執行緒,首先要把執行緒的狀態從休眠狀態轉換到 RUNNABLE 狀態。如何做到呢?這個要靠 Java Thread 類提供的 interrupt() 方法,它可以將休眠狀態的執行緒轉換到 RUNNABLE 狀態。

執行緒轉換到 RUNNABLE 狀態之後,我們如何再將其終止呢?RUNNABLE 狀態轉換到終止狀態,優雅的方式是讓 Java 執行緒自己執行完 run() 方法,所以一般我們採用的方法是設定一個標誌位,然後執行緒會在合適的時機檢查這個標誌位,如果發現符合終止條件,則自動退出 run() 方法。這個過程其實就是第二階段:響應終止指令。終止指令,其實包括兩方面內容:interrupt() 方法和執行緒終止的標誌位。

如果我們線上程內捕獲中斷異常(如Thread.sleep()丟擲了中斷一次)之後,需通過 Thread.currentThread().interrupt() 重新設定執行緒的中斷狀態,因為 JVM 的異常處理會清除執行緒的中斷狀態。

 

建議自己設定執行緒終止標誌位,避免執行緒內呼叫第三方類庫的方法未處理執行緒中斷狀態,如下所示。

 1 public class InterruptDemo {
 2 
 3     /**
 4      * 輸出:呼叫 interrupt() 時,只是設定了執行緒中斷標識,執行緒依舊會繼續執行當前方法,執行完之後再退出執行緒。
 5      * do something...
 6      * continue do something...
 7      * do something...
 8      * continue do something...
 9      * do something...
10      * 執行緒被中斷...
11      * continue do something...
12      */
13     public static void main(String[] args) throws InterruptedException {
14         Proxy proxy = new Proxy();
15         proxy.start();
16 
17         Thread.sleep(6000);
18         proxy.stop();
19     }
20 
21     static class Proxy {
22         // 自定義執行緒終止標誌位
23         private volatile boolean terminated = false;
24 
25         private boolean started = false;
26 
27         Thread t;
28 
29         public synchronized void start() {
30             if (started) {
31                 return;
32             }
33             started = true;
34             terminated = false;
35 
36             t = new Thread(() -> {
37                 while (!terminated) { // 取代 while (true)
38                     System.out.println("do something...");
39                     try {
40                         Thread.sleep(2000);
41                     } catch (InterruptedException e) {
42                         // 如果其它執行緒中斷此執行緒,丟擲異常時,需重新設定執行緒中斷狀態,因為 JVM 的異常處理會清除執行緒的中斷狀態。
43                         System.out.println("執行緒被中斷...");
44                         Thread.currentThread().interrupt();
45                     }
46                     System.out.println("continue do something...");
47                 }
48                 started = false;
49             });
50             t.start();
51         }
52 
53         public synchronized void stop() {
54             // 設定中斷標誌
55             terminated = true;
56             t.interrupt();
57         }
58     }
59 
60 }

 

優雅的終止執行緒池

執行緒池提供了兩個方法來中斷執行緒池:shutdown() 和 shutdownNow()。

shutdown():是一種很保守的關閉執行緒池的方法。執行緒池執行 shutdown() 後,就會拒絕接收新的任務,但是會等待執行緒池中正在執行的任務和已經進入阻塞佇列的任務都執行完之後才最終關閉執行緒池。

shutdownNow():相對激進一些,執行緒池執行 shutdownNow() 後,會拒絕接收新的任務,同時還會中斷執行緒池中正在執行的任務,已經進入阻塞佇列的任務也被剝奪了執行的機會,不過這些被剝奪執行機會的任務會作為 shutdownNow() 方法的返回值返回。因為 shutdownNow() 方法會中斷正在執行的執行緒,所以提交到執行緒池的任務,如果需要優雅地結束,就需要正確地處理執行緒中斷。如果提交到執行緒池的任務不允許取消,那就不能使用 shutdownNow() 方法終止執行緒池。

 

如果想在jvm關閉的時候進行記憶體清理、物件銷燬等操作,或者僅僅想起個執行緒然後這個執行緒不會退出,可以使用Runtime.addShutdownHook。

這個方法的作用就是在JVM中增加一個關閉的鉤子。當程式正常退出、系統呼叫 System.exit 方法或者虛擬機器被關閉時才會執行系統中已經設定的所有鉤子,當系統執行完這些鉤子後,JVM才會關閉。

利用這個性質,就可以在這個最後執行的執行緒中把執行緒池優雅的關閉掉。雖然jvm關閉了,但優雅關閉執行緒池總是好的,特別是涉及到服務端的 tcp 連線。

 1 /**
 2  * 新增Hook在Jvm關閉時優雅的關閉執行緒池
 3  *
 4  * @param threadPool     執行緒池
 5  * @param threadPoolName 執行緒池名稱
 6  */
 7 public static void hookShutdownThreadPool(ExecutorService threadPool, String threadPoolName) {
 8     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
 9         LOGGER.info("[>>ExecutorShutdown<<] Start to shutdown the thead pool: [{}]", threadPoolName);
10         // 使新任務無法提交
11         threadPool.shutdown();
12         try {
13             // 等待未完成任務結束
14             if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
15                 threadPool.shutdownNow(); // 取消當前執行的任務
16                 LOGGER.warn("[>>ExecutorShutdown<<] Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");
17 
18                 // 等待任務取消的響應
19                 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
20                     LOGGER.error("[>>ExecutorShutdown<<] Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
21                 }
22             }
23         } catch (InterruptedException ie) {
24             // 重新取消當前執行緒進行中斷
25             threadPool.shutdownNow();
26             LOGGER.error("[>>ExecutorShutdown<<] The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state. Please check the biz logs.");
27 
28             // 保留中斷狀態
29             Thread.currentThread().interrupt();
30         }
31 
32         LOGGER.info("[>>ExecutorShutdown<<] Finally shutdown the thead pool: [{}]", threadPoolName);
33     }));
34 }

Executors

考慮到 ThreadPoolExecutor 的建構函式實在是有些複雜,所以 Java 併發包裡提供了一個執行緒池的靜態工廠類 Executors,利用 Executors 你可以快速建立執行緒池。

但《阿里巴巴 Java 開發手冊》中提到,禁止使用這些方法來建立執行緒池,而應該手動 new ThreadPoolExecutor 來建立執行緒池。最重要的原因是:Executors 提供的很多方法預設使用的都是無界的 LinkedBlockingQueue,高負載情境下,無界佇列很容易導致 OOM,而 OOM 會導致所有請求都無法處理,這是致命問題。最典型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因為資源耗盡導致 OOM 問題。

 

newCachedThreadPool

具有快取性質的執行緒池,執行緒最大空閒時間60s,執行緒可重複利用,沒有最大執行緒數限制。使用的是 SynchronousQueue 無容量阻塞佇列,沒有最大執行緒數限制。這意味著,只要有請求到來,就必須找到一條工作執行緒來處理,如果當前沒有空閒的執行緒就再建立一條新的。

高併發情況下,大量的任務進來後會建立大量的執行緒,導致OOM(無法建立本地執行緒):

1 [11:30:30.487] [http-nio-45678-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; 
2     nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause
3 java.lang.OutOfMemoryError: unable to create new native thread 

 

newFixedThreadPool

具有固定數量的執行緒池,核心執行緒數等於最大執行緒數,超出最大執行緒數進行等待。使用的是 LinkedBlockingQueue 無界阻塞佇列。雖然使用 newFixedThreadPool 可以把工作執行緒控制在固定的數量上,但任務佇列是無界的。如果任務較多並且執行較慢的話,佇列可能會快速積壓,撐爆記憶體導致 OOM。

如果一直往這個無界佇列中新增任務,不久就會出現OOM異常(記憶體佔滿):

1 Exception in thread "http-nio-45678-ClientPoller" 
2     java.lang.OutOfMemoryError: GC overhead limit exceeded

 

newSingleThreadExecutor

核心執行緒數與最大執行緒數均為1,可用於當鎖控制同步。使用的是 LinkedBlockingQueue 無界阻塞佇列。

 

newScheduledThreadPool

具有時間排程性的執行緒池,必須初始化核心執行緒數。

沒有最大執行緒數限制,執行緒最大空閒時間為0,空閒執行緒執行完即銷燬。底層使用 DelayedWorkQueue 實現延遲特性。

執行緒池建立正確姿勢

最後,總結一下,從如下的一些方面考慮如何正確地建立執行緒池。

執行緒池配置

我們需要根據自己的場景、併發情況來評估執行緒池的幾個核心引數,包括核心執行緒數、最大執行緒數、執行緒回收策略、工作佇列的型別,以及拒絕策略,確保執行緒池的工作行為符合需求,一般都需要設定有界的工作佇列和可控的執行緒數。

要根據任務的“輕重緩急”來指定執行緒池的核心引數,包括執行緒數、回收策略和任務佇列:

  • 對於執行比較慢、數量不大的 IO 任務,要考慮更多的執行緒數,而不需要太大的佇列。
  • 對於吞吐量較大的計算型任務,執行緒數量不宜過多,可以是 CPU 核數或核數 *2(理由是,執行緒一定排程到某個 CPU 進行執行,如果任務本身是 CPU 繫結的任務,那麼過多的執行緒只會增加執行緒切換的開銷,並不能提升吞吐量),但可能需要較長的佇列來做緩衝。

 

任何時候,都應該為自定義執行緒池指定有意義的名稱,以方便排查問題。當出現執行緒數量暴增、執行緒死鎖、執行緒佔用大量 CPU、執行緒執行出現異常等問題時,我們往往會抓取執行緒棧。此時,有意義的執行緒名稱,就可以方便我們定位問題。

除了建議手動宣告執行緒池以外,還建議用一些監控手段來觀察執行緒池的狀態。如果我們能提前觀察到執行緒池佇列的積壓,或者執行緒數量的快速膨脹,往往可以提早發現並解決問題。

 

確認執行緒池本身是不是複用的

既然使用了執行緒池就需要確保執行緒池是在複用的,每次 new 一個執行緒池出來可能比不用執行緒池還糟糕。如果你沒有直接宣告執行緒池而是使用其他同學提供的類庫來獲得一個執行緒池,請務必檢視原始碼,以確認執行緒池的例項化方式和配置是符合預期的。

 

斟酌執行緒池的混用策略

不要盲目複用執行緒池,別人定義的執行緒池屬性不一定適合你的任務,而且混用會相互干擾。

另外,Java 8 的 parallel stream 背後是共享同一個 ForkJoinPool,預設並行度是 CPU 核數 -1。對於 CPU 繫結的任務來說,使用這樣的配置比較合適,但如果集合操作涉及同步 IO 操作的話(比如資料庫操作、外部服務呼叫等),建議自定義一個 ForkJoinPool(或普通執行緒池)。因此在使用 Java8 的並行流時,建議只用在計算密集型的任務,IO密集型的任務建議自定義執行緒池來提交任務,避免影響其它業務。

 

CommonExecutor

如下是我自己封裝的一個執行緒池工具類,還提供了執行批量任務的方法,關於批量任務後面再單獨寫篇文章來介紹。

  1 package org.hzero.core.util;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Collections;
  5 import java.util.List;
  6 import java.util.concurrent.*;
  7 import java.util.stream.Collectors;
  8 import javax.annotation.Nonnull;
  9 
 10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 11 import org.apache.commons.collections4.CollectionUtils;
 12 import org.apache.commons.lang3.RandomUtils;
 13 import org.slf4j.Logger;
 14 import org.slf4j.LoggerFactory;
 15 import org.springframework.dao.DuplicateKeyException;
 16 
 17 import io.choerodon.core.exception.CommonException;
 18 
 19 import org.hzero.core.base.BaseConstants;
 20 
 21 /**
 22  * @author bojiangzhou 2020/02/24
 23  */
 24 public class CommonExecutor {
 25 
 26     private static final Logger LOGGER = LoggerFactory.getLogger(CommonExecutor.class);
 27 
 28     private static final ThreadPoolExecutor BASE_EXECUTOR;
 29 
 30     static {
 31         BASE_EXECUTOR = buildThreadFirstExecutor("BaseExecutor");
 32     }
 33 
 34     /**
 35      * 構建執行緒優先的執行緒池
 36      * <p>
 37      * 執行緒池預設是當核心執行緒數滿了後,將任務新增到工作佇列中,當工作佇列滿了之後,再建立執行緒直到達到最大執行緒數。
 38      *
 39      * <p>
 40      * 執行緒優先的執行緒池,就是在核心執行緒滿了之後,繼續建立執行緒,直到達到最大執行緒數之後,再把任務新增到工作佇列中。
 41      *
 42      * <p>
 43      * 此方法預設設定核心執行緒數為 CPU 核數,最大執行緒數為 8倍 CPU 核數,空閒執行緒超過 5 分鐘銷燬,工作佇列大小為 65536。
 44      *
 45      * @param poolName        執行緒池名稱
 46      * @return ThreadPoolExecutor
 47      */
 48     public static ThreadPoolExecutor buildThreadFirstExecutor(String poolName) {
 49         int coreSize = CommonExecutor.getCpuProcessors();
 50         int maxSize = coreSize * 8;
 51         return buildThreadFirstExecutor(coreSize, maxSize, 5, TimeUnit.MINUTES, 1 << 16, poolName);
 52     }
 53 
 54     /**
 55      * 構建執行緒優先的執行緒池
 56      * <p>
 57      * 執行緒池預設是當核心執行緒數滿了後,將任務新增到工作佇列中,當工作佇列滿了之後,再建立執行緒直到達到最大執行緒數。
 58      *
 59      * <p>
 60      * 執行緒優先的執行緒池,就是在核心執行緒滿了之後,繼續建立執行緒,直到達到最大執行緒數之後,再把任務新增到工作佇列中。
 61      *
 62      * @param corePoolSize    核心執行緒數
 63      * @param maximumPoolSize 最大執行緒數
 64      * @param keepAliveTime   空閒執行緒的空閒時間
 65      * @param unit            時間單位
 66      * @param workQueueSize   工作佇列容量大小
 67      * @param poolName        執行緒池名稱
 68      * @return ThreadPoolExecutor
 69      */
 70     public static ThreadPoolExecutor buildThreadFirstExecutor(int corePoolSize,
 71                                                               int maximumPoolSize,
 72                                                               long keepAliveTime,
 73                                                               TimeUnit unit,
 74                                                               int workQueueSize,
 75                                                               String poolName) {
 76         // 自定義佇列,優先開啟更多執行緒,而不是放入佇列
 77         LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(workQueueSize) {
 78             private static final long serialVersionUID = 5075561696269543041L;
 79 
 80             @Override
 81             public boolean offer(@Nonnull Runnable o) {
 82                 return false; // 造成佇列已滿的假象
 83             }
 84         };
 85 
 86         // 當執行緒達到 maximumPoolSize 時會觸發拒絕策略,此時將任務 put 到佇列中
 87         RejectedExecutionHandler rejectedExecutionHandler = (runnable, executor) -> {
 88             try {
 89                 // 任務拒絕時,通過 offer 放入佇列
 90                 queue.put(runnable);
 91             } catch (InterruptedException e) {
 92                 LOGGER.warn("{} Queue offer interrupted. ", poolName, e);
 93                 Thread.currentThread().interrupt();
 94             }
 95         };
 96 
 97         ThreadPoolExecutor executor = new ThreadPoolExecutor(
 98                 corePoolSize, maximumPoolSize,
 99                 keepAliveTime, unit,
100                 queue,
101                 new ThreadFactoryBuilder()
102                         .setNameFormat(poolName + "-%d")
103                         .setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
104                             LOGGER.error("{} catching the uncaught exception, ThreadName: [{}]", poolName, thread.toString(), throwable);
105                         })
106                         .build(),
107                 rejectedExecutionHandler
108         );
109 
110         CommonExecutor.displayThreadPoolStatus(executor, poolName);
111         CommonExecutor.hookShutdownThreadPool(executor, poolName);
112         return executor;
113     }
114 
115     /**
116      * 批量提交非同步任務,使用預設的執行緒池
117 * 118 * @param tasks 將任務轉化為 AsyncTask 批量提交 119 */ 120 public static <T> List<T> batchExecuteAsync(List<AsyncTask<T>> tasks, @Nonnull String taskName) { 121 return batchExecuteAsync(tasks, BASE_EXECUTOR, taskName); 122 } 123 124 /** 125 * 批量提交非同步任務,執行失敗可丟擲異常或返回異常編碼即可 <br> 126 * <p> 127 * 需注意提交的非同步任務無法控制事務,一般需容忍產生一些垃圾資料的情況下才能使用非同步任務,非同步任務執行失敗將丟擲異常,主執行緒可回滾事務. 128 * <p> 129 * 非同步任務失敗後,將取消剩餘的任務執行. 130 * 131 * @param tasks 將任務轉化為 AsyncTask 批量提交 132 * @param executor 執行緒池,需自行根據業務場景建立相應的執行緒池 133 * @return 返回執行結果 134 */ 135 public static <T> List<T> batchExecuteAsync(@Nonnull List<AsyncTask<T>> tasks, @Nonnull ThreadPoolExecutor executor, @Nonnull String taskName) { 136 if (CollectionUtils.isEmpty(tasks)) { 137 return Collections.emptyList(); 138 } 139 140 int size = tasks.size(); 141 142 List<Callable<T>> callables = tasks.stream().map(t -> (Callable<T>) () -> { 143 try { 144 T r = t.doExecute(); 145 146 LOGGER.debug("[>>Executor<<] Async task execute success. ThreadName: [{}], BatchTaskName: [{}], SubTaskName: [{}]", 147 Thread.currentThread().getName(), taskName, t.taskName()); 148 return r; 149 } catch (Throwable e) { 150 LOGGER.warn("[>>Executor<<] Async task execute error. ThreadName: [{}], BatchTaskName: [{}], SubTaskName: [{}], exception: {}", 151 Thread.currentThread().getName(), taskName, t.taskName(), e.getMessage()); 152 throw e; 153 } 154 }).collect(Collectors.toList()); 155 156 CompletionService<T> cs = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(size)); 157 List<Future<T>> futures = new ArrayList<>(size); 158 LOGGER.info("[>>Executor<<] Start async tasks, BatchTaskName: [{}], TaskSize: [{}]", taskName, size); 159 160 for (Callable<T> task : callables) { 161 futures.add(cs.submit(task)); 162 } 163 164 List<T> resultList = new ArrayList<>(size); 165 for (int i = 0; i < size; i++) { 166 try { 167 Future<T> future = cs.poll(6, TimeUnit.MINUTES); 168 if (future != null) { 169 T result = future.get(); 170 resultList.add(result); 171 LOGGER.debug("[>>Executor<<] Async task [{}] - [{}] execute success, result: {}", taskName, i, result); 172 } else { 173 cancelTask(futures); 174 LOGGER.error("[>>Executor<<] Async task [{}] - [{}] execute timeout, then cancel other tasks.", taskName, i); 175 throw new CommonException(BaseConstants.ErrorCode.TIMEOUT); 176 } 177 } catch (ExecutionException e) { 178 LOGGER.warn("[>>Executor<<] Async task [{}] - [{}] execute error, then cancel other tasks.", taskName, i, e); 179 cancelTask(futures); 180 Throwable throwable = e.getCause(); 181 if (throwable instanceof CommonException) { 182 throw (CommonException) throwable; 183 } else if (throwable instanceof DuplicateKeyException) { 184 throw (DuplicateKeyException) throwable; 185 } else { 186 throw new CommonException("error.executorError", e.getCause().getMessage()); 187 } 188 } catch (InterruptedException e) { 189 cancelTask(futures); 190 Thread.currentThread().interrupt(); // 重置中斷標識 191 LOGGER.error("[>>Executor<<] Async task [{}] - [{}] were interrupted.", taskName, i); 192 throw new CommonException(BaseConstants.ErrorCode.ERROR); 193 } 194 } 195 LOGGER.info("[>>Executor<<] Finish async tasks , BatchTaskName: [{}], TaskSize: [{}]", taskName, size); 196 return resultList; 197 } 198 199 /** 200 * 根據一定週期輸出執行緒池的狀態 201 * 202 * @param threadPool 執行緒池 203 * @param threadPoolName 執行緒池名稱 204 */ 205 public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName) { 206 displayThreadPoolStatus(threadPool, threadPoolName, RandomUtils.nextInt(60, 600), TimeUnit.SECONDS); 207 } 208 209 /** 210 * 根據一定週期輸出執行緒池的狀態 211 * 212 * @param threadPool 執行緒池 213 * @param threadPoolName 執行緒池名稱 214 * @param period 週期 215 * @param unit 時間單位 216 */ 217 public static void displayThreadPoolStatus(ThreadPoolExecutor threadPool, String threadPoolName, long period, TimeUnit unit) { 218 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { 219 LOGGER.info("[>>ExecutorStatus<<] ThreadPool Name: [{}], Pool Status: [shutdown={}, Terminated={}], Pool Thread Size: {}, Active Thread Count: {}, Task Count: {}, Tasks Completed: {}, Tasks in Queue: {}", 220 threadPoolName, 221 threadPool.isShutdown(), threadPool.isTerminated(), // 執行緒是否被終止 222 threadPool.getPoolSize(), // 執行緒池執行緒數量 223 threadPool.getActiveCount(), // 工作執行緒數 224 threadPool.getTaskCount(), // 總任務數 225 threadPool.getCompletedTaskCount(), // 已完成的任務數 226 threadPool.getQueue().size()); // 執行緒池中執行緒的數量 227 }, 0, period, unit); 228 } 229 230 /** 231 * 新增Hook在Jvm關閉時優雅的關閉執行緒池 232 * 233 * @param threadPool 執行緒池 234 * @param threadPoolName 執行緒池名稱 235 */ 236 public static void hookShutdownThreadPool(ExecutorService threadPool, String threadPoolName) { 237 Runtime.getRuntime().addShutdownHook(new Thread(() -> { 238 LOGGER.info("[>>ExecutorShutdown<<] Start to shutdown the thead pool: [{}]", threadPoolName); 239 // 使新任務無法提交 240 threadPool.shutdown(); 241 try { 242 // 等待未完成任務結束 243 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 244 threadPool.shutdownNow(); // 取消當前執行的任務 245 LOGGER.warn("[>>ExecutorShutdown<<] Interrupt the worker, which may cause some task inconsistent. Please check the biz logs."); 246 247 // 等待任務取消的響應 248 if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 249 LOGGER.error("[>>ExecutorShutdown<<] Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs."); 250 } 251 } 252 } catch (InterruptedException ie) { 253 // 重新取消當前執行緒進行中斷 254 threadPool.shutdownNow(); 255 LOGGER.error("[>>ExecutorShutdown<<] The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconsistent state. Please check the biz logs."); 256 257 // 保留中斷狀態 258 Thread.currentThread().interrupt(); 259 } 260 261 LOGGER.info("[>>ExecutorShutdown<<] Finally shutdown the thead pool: [{}]", threadPoolName); 262 })); 263 } 264 265 /** 266 * 獲取返回CPU核數 267 * 268 * @return 返回CPU核數,預設為8 269 */ 270 public static int getCpuProcessors() { 271 return Runtime.getRuntime() != null && Runtime.getRuntime().availableProcessors() > 0 ? 272 Runtime.getRuntime().availableProcessors() : 8; 273 } 274 275 private static <T> void cancelTask(List<Future<T>> futures) { 276 for (Future<T> future : futures) { 277 if (!future.isDone()) { 278 future.cancel(true); 279 } 280 } 281 } 282 283 }

AsyncTask:

 1 package org.hzero.core.util;
 2 
 3 import java.util.UUID;
 4 
 5 public interface AsyncTask<T> {
 6 
 7     default String taskName() {
 8         return UUID.randomUUID().toString();
 9     }
10 
11     T doExecute();
12 }

 

--------------------------------------------------------------------------------------------------------------

&n