1. 程式人生 > 實用技巧 >執行緒池最佳實踐

執行緒池最佳實踐

簡單演示一下如何使用執行緒池

private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {

        //使用阿里巴巴推薦的建立執行緒池的方式
        //通過ThreadPoolExecutor建構函式自定義引數建立
ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) { executor.execute(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CurrentThread name:" + Thread.currentThread().getName() + "date:" + Instant.now()); }); }
//終止執行緒池 executor.shutdown(); try { executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Finished all threads"); }

1. 使用ThreadPoolExecutor的建構函式宣告執行緒池

1. 執行緒池必須手動通過ThreadPoolExecutor的建構函式來宣告,避免使用Executors類的newFixedThreadPoolnewCachedThreadPool,因為可能會有 OOM 的風險。

Executors 返回執行緒池物件的弊端如下:

  • FixedThreadPoolSingleThreadExecutor: 允許請求的佇列長度為Integer.MAX_VALUE,可能堆積大量的請求,從而導致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool: 允許建立的執行緒數量為Integer.MAX_VALUE,可能會建立大量執行緒,從而導致 OOM。

說白了就是:使用有界佇列,控制執行緒建立數量。

除了避免 OOM 的原因之外,不推薦使用Executors提供的兩種快捷的執行緒池的原因還有:

  1. 實際使用中需要根據自己機器的效能、業務場景來手動配置執行緒池的引數比如核心執行緒數、使用的任務佇列、飽和策略等等。
  2. 我們應該顯示地給我們的執行緒池命名,這樣有助於我們定位問題。

2.監測執行緒池執行狀態

你可以通過一些手段來檢測執行緒池的執行狀態比如 SpringBoot 中的 Actuator 元件。

除此之外,我們還可以利用ThreadPoolExecutor的相關 API做一個簡陋的監控。從下圖可以看出,ThreadPoolExecutor提供了獲取執行緒池當前的執行緒數和活躍執行緒數、已經執行完成的任務數、正在排隊中的任務數等等。

下面是一個簡單的 Demo。printThreadPoolStatus()會每隔一秒打印出執行緒池的執行緒數、活躍執行緒數、完成的任務數、以及佇列中的任務數。

    /**
     * 列印執行緒池的狀態
     *
     * @param threadPool 執行緒池物件
     */
    public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-thread-pool-status", false));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("=========================");
            log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
            log.info("Active Threads: {}", threadPool.getActiveCount());
            log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
            log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);
    }

3.建議不同類別的業務用不同的執行緒池

很多人在實際專案中都會有類似這樣的問題:我的專案中多個業務需要用到執行緒池,是為每個執行緒池都定義一個還是說定義一個公共的執行緒池呢?

一般建議是不同的業務使用不同的執行緒池,配置執行緒池的時候根據當前業務的情況對當前執行緒池進行配置,因為不同的業務的併發以及對資源的使用情況都不同,重心優化系統性能瓶頸相關的業務。

我們再來看一個真實的事故案例!(本案例來源自:《執行緒池運用不當的一次線上事故》,很精彩的一個案例)

上面的程式碼可能會存在死鎖的情況,為什麼呢?畫個圖給大家捋一捋。

試想這樣一種極端情況:

假如我們執行緒池的核心執行緒數為n,父任務(扣費任務)數量為n,父任務下面有兩個子任務(扣費任務下的子任務),其中一個已經執行完成,另外一個被放在了任務佇列中。由於父任務把執行緒池核心執行緒資源用完,所以子任務因為無法獲取到執行緒資源無法正常執行,一直被阻塞在佇列中。父任務等待子任務執行完成,而子任務等待父任務釋放執行緒池資源,這也就造成了"死鎖"。

解決方法也很簡單,就是新增加一個用於執行子任務的執行緒池專門為其服務。

4.別忘記給執行緒池命名

初始化執行緒池的時候需要顯示命名(設定執行緒池名稱字首),有利於定位問題。

預設情況下建立的執行緒名字類似 pool-1-thread-n 這樣的,沒有業務含義,不利於我們定位問題。

給執行緒池裡的執行緒命名通常有下面兩種方式:

**1.利用 guava 的ThreadFactoryBuilder**

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

2.自己實現ThreadFactor

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 執行緒工廠,它設定執行緒名稱,有利於我們定位問題。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 建立一個帶名字的執行緒池生產工廠
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }

    @Override 
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }

}