1. 程式人生 > 實用技巧 >執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式

執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式

1. 通過Executors建立執行緒池的弊端

在建立執行緒池的時候,大部分人還是會選擇使用Executors去建立。

下面是建立定長執行緒池(FixedThreadPool)的一個例子,嚴格來說,當使用如下程式碼建立執行緒池時,是不符合程式設計規範的。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);

原因在於:(摘自阿里編碼規約)

執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
  主要問題是堆積的請求處理佇列可能會耗費非常大的記憶體,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
  主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至OOM。

2.通過ThreadPoolExecutor建立執行緒池

所以,針對上面的不規範程式碼,重構為通過ThreadPoolExecutor建立執行緒池的方式。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

ThreadPoolExecutor 是執行緒池的核心實現。執行緒的建立和終止需要很大的開銷,執行緒池中預先提供了指定數量的可重用執行緒,所以使用執行緒池會節省系統資源,並且每個執行緒池都維護了一些基礎的資料統計,方便執行緒的管理和監控。

3.ThreadPoolExecutor引數解釋

下面是對其引數的解釋,在建立執行緒池時需根據自己的情況來合理設定執行緒池。

corePoolSize & maximumPoolSize

核心執行緒數(corePoolSize)和最大執行緒數(maximumPoolSize)是執行緒池中非常重要的兩個概念,希望同學們能夠掌握。
當一個新任務被提交到池中,如果當前執行執行緒小於核心執行緒數(corePoolSize),即使當前有空閒執行緒,也會新建一個執行緒來處理新提交的任務;如果當前執行執行緒數大於核心執行緒數(corePoolSize)並小於最大執行緒數(maximumPoolSize),只有當等待佇列已滿的情況下才會新建執行緒。

keepAliveTime & unit

keepAliveTime 為超過 corePoolSize 執行緒數量的執行緒最大空閒時間,unit 為時間單位。

等待佇列

任何阻塞佇列(BlockingQueue)都可以用來轉移或儲存提交的任務,執行緒池大小和阻塞佇列相互約束執行緒池:

  1. 如果執行執行緒數小於corePoolSize,提交新任務時就會新建一個執行緒來執行;

  2. 如果執行執行緒數大於或等於corePoolSize,新提交的任務就會入列等待;如果佇列已滿,並且執行執行緒數小於maximumPoolSize,也將會新建一個執行緒來執行;

  3. 如果執行緒數大於maximumPoolSize,新提交的任務將會根據拒絕策略來處理。

下面來看一下三種通用的入隊策略:

  1. 直接傳遞:通過 SynchronousQueue 直接把任務傳遞給執行緒。如果當前沒可用執行緒,嘗試入隊操作會失敗,然後再建立一個新的執行緒。當處理可能具有內部依賴性的請求時,該策略會避免請求被鎖定。直接傳遞通常需要無界的最大執行緒數(maximumPoolSize),避免拒絕新提交的任務。當任務持續到達的平均速度超過可處理的速度時,可能導致執行緒的無限增長。

  2. 無界佇列:使用無界佇列(如 LinkedBlockingQueue)作為等待佇列,當所有的核心執行緒都在處理任務時, 新提交的任務都會進入佇列等待。因此,不會有大於 corePoolSize 的執行緒會被建立(maximumPoolSize 也將失去作用)。這種策略適合每個任務都完全獨立於其他任務的情況;例如網站伺服器。這種型別的等待佇列可以使瞬間爆發的高頻請求變得平滑。當任務持續到達的平均速度超過可處理速度時,可能導致等待佇列無限增長。

  3. 有界佇列:當使用有限的最大執行緒數時,有界佇列(如 ArrayBlockingQueue)可以防止資源耗盡,但是難以調整和控制。佇列大小和執行緒池大小可以相互作用:使用大的佇列和小的執行緒數可以減少CPU使用率、系統資源和上下文切換的開銷,但是會導致吞吐量變低,如果任務頻繁地阻塞(例如被I/O限制),系統就能為更多的執行緒排程執行時間。使用小的佇列通常需要更多的執行緒數,這樣可以最大化CPU使用率,但可能會需要更大的排程開銷,從而降低吞吐量。

拒絕策略

當執行緒池已經關閉或達到飽和(最大執行緒和佇列都已滿)狀態時,新提交的任務將會被拒絕。 ThreadPoolExecutor 定義了四種拒絕策略:

  1. AbortPolicy:預設策略,在需要拒絕任務時丟擲RejectedExecutionException;

  2. CallerRunsPolicy:直接在 execute 方法的呼叫執行緒中執行被拒絕的任務,如果執行緒池已經關閉,任務將被丟棄;

  3. DiscardPolicy:直接丟棄任務;

  4. DiscardOldestPolicy:丟棄佇列中等待時間最長的任務,並執行當前提交的任務,如果執行緒池已經關閉,任務將被丟棄。

我們也可以自定義拒絕策略,只需要實現 RejectedExecutionHandler; 需要注意的是,拒絕策略的執行需要指定執行緒池和佇列的容量。

4.ThreadPoolExecutor建立執行緒方式

通過下面的demo來了解ThreadPoolExecutor建立執行緒的過程。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 測試ThreadPoolExecutor對執行緒的執行順序
 **/
public class ThreadPoolSerialTest {
    public static void main(String[] args) {
        //核心執行緒數
        int corePoolSize = 3;
        //最大執行緒數
        int maximumPoolSize = 6;
        //超過 corePoolSize 執行緒數量的執行緒最大空閒時間
        long keepAliveTime = 2;
        //以秒為時間單位
        TimeUnit unit = TimeUnit.SECONDS;
        //建立工作佇列,用於存放提交的等待執行任務
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);
        ThreadPoolExecutor threadPoolExecutor = null;
        try {
            //建立執行緒池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());

            //迴圈提交任務
            for (int i = 0; i < 8; i++) {
                //提交任務的索引
                final int index = (i + 1);
                threadPoolExecutor.submit(() -> {
                    //執行緒列印輸出
                    System.out.println("大家好,我是執行緒:" + index);
                    try {
                        //模擬執行緒執行時間,10s
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
                //每個任務提交後休眠500ms再提交下一個任務,用於保證提交順序
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}

執行結果:

這裡描述一下執行的流程:

  • 首先通過 ThreadPoolExecutor 建構函式建立執行緒池;
  • 執行 for 迴圈,提交 8 個任務(恰好等於maximumPoolSize[最大執行緒數] + capacity[佇列大小]);
  • 通過 threadPoolExecutor.submit 提交 Runnable 介面實現的執行任務;
  • 提交第1個任務時,由於當前執行緒池中正在執行的任務為 0 ,小於 3(corePoolSize 指定),所以會建立一個執行緒用來執行提交的任務1;
  • 提交第 2, 3 個任務的時候,由於當前執行緒池中正在執行的任務數量小於等於 3 (corePoolSize 指定),所以會為每一個提交的任務建立一個執行緒來執行任務;
  • 當提交第4個任務的時候,由於當前正在執行的任務數量為 3 (因為每個執行緒任務執行時間為10s,所以提交第4個任務的時候,前面3個執行緒都還在執行中),此時會將第4個任務存放到 workQueue 佇列中等待執行;
  • 由於 workQueue 佇列的大小為 2 ,所以該佇列中也就只能儲存 2 個等待執行的任務,所以第5個任務也會儲存到任務佇列中;
  • 當提交第6個任務的時候,因為當前執行緒池正在執行的任務數量為3,workQueue 佇列中儲存的任務數量也滿了,這時會判斷當前執行緒池中正在執行的任務的數量是否小於6(maximumPoolSize指定);
  • 如果小於 6 ,那麼就會新建立一個執行緒來執行提交的任務 6;
  • 執行第7,8個任務的時候,也要判斷當前執行緒池中正在執行的任務數是否小於6(maximumPoolSize指定),如果小於6,那麼也會立即新建執行緒來執行這些提交的任務;
  • 此時,6個任務都已經提交完畢,那 workQueue 佇列中的等待 任務4 和 任務5 什麼時候執行呢?
  • 當任務1執行完畢後(10s後),執行任務1的執行緒並沒有被銷燬掉,而是獲取 workQueue 中的任務4來執行;
  • 當任務2執行完畢後,執行任務2的執行緒也沒有被銷燬,而是獲取 workQueue 中的任務5來執行;
通過上面流程的分析,也就知道了之前案例的輸出結果的原因。其實,執行緒池中會執行緒執行完畢後,並不會被立刻銷燬,執行緒池中會保留 corePoolSize 數量的執行緒,當 workQueue 佇列中存在任務或者有新提交任務時,那麼會通過執行緒池中已有的執行緒來執行任務,避免了頻繁的執行緒建立與銷燬,而大於 corePoolSize 小於等於 maximumPoolSize 建立的執行緒,則會在空閒指定時間(keepAliveTime)後進行回收。

5.ThreadPoolExecutor拒絕策略

在上面的測試中,我設定的執行執行緒總數恰好等於maximumPoolSize[最大執行緒數] + capacity[佇列大小],因此沒有出現需要執行拒絕策略的情況,因此在這裡,我再增加一個執行緒,提交9個任務,來演示不同的拒絕策略。

AbortPolicy

CallerRunsPolicy

DiscardPolicy

DiscardOldestPolicy

參考:https://www.jianshu.com/p/7be43712ef21 https://www.jianshu.com/p/6f82b738ac58