1. 程式人生 > 實用技巧 >執行緒池的7種建立方式,強烈推薦你用它...

執行緒池的7種建立方式,強烈推薦你用它...

根據摩爾定律所說:積體電路上可容納的電晶體數量每 18 個月翻一番,因此 CPU 上的電晶體數量會越來越多。

但隨著時間的推移,積體電路上可容納的電晶體數量已趨向飽和,摩爾定律也漸漸失效,因此多核 CPU 逐漸變為主流,與之相對應的多執行緒程式設計也開始變得普及和流行起來,這當然也是很久之前的事了,對於現在而言多執行緒程式設計已經成為程式設計師必備的職業技能了,那接下來我們就來盤一盤“執行緒池”這個多執行緒程式設計中最重要的話題。

什麼是執行緒池?

執行緒池(ThreadPool)是一種基於池化思想管理和使用執行緒的機制。它是將多個執行緒預先儲存在一個“池子”內,當有任務出現時可以避免重新建立和銷燬執行緒所帶來效能開銷,只需要從“池子”內取出相應的執行緒執行對應的任務即可。

池化思想在計算機的應用也比較廣泛,比如以下這些:

  • 記憶體池(Memory Pooling):預先申請記憶體,提升申請記憶體速度,減少記憶體碎片。
  • 連線池(Connection Pooling):預先申請資料庫連線,提升申請連線的速度,降低系統的開銷。
  • 例項池(Object Pooling):迴圈使用物件,減少資源在初始化和釋放時的昂貴損耗。

執行緒池的優勢主要體現在以下 4 點:

  1. 降低資源消耗:通過池化技術重複利用已建立的執行緒,降低執行緒建立和銷燬造成的損耗。
  2. 提高響應速度:任務到達時,無需等待執行緒建立即可立即執行。
  3. 提高執行緒的可管理性:執行緒是稀缺資源,如果無限制建立,不僅會消耗系統資源,還會因為執行緒的不合理分佈導致資源排程失衡,降低系統的穩定性。使用執行緒池可以進行統一的分配、調優和監控。
  4. 提供更多更強大的功能:執行緒池具備可拓展性,允許開發人員向其中增加更多的功能。比如延時定時執行緒池ScheduledThreadPoolExecutor,就允許任務延期執行或定期執行。

同時阿里巴巴在其《Java開發手冊》中也強制規定:執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式建立執行緒

說明:執行緒池的好處是減少在建立和銷燬執行緒上所消耗的時間以及系統資源的開銷,解決資源不足的問題。
如果不使用執行緒池,有可能造成系統建立大量同類執行緒而導致消耗完記憶體或者“過度切換”的問題。

知道了什麼是執行緒池以及為什要用執行緒池之後,我們再來看怎麼用執行緒池。

執行緒池使用

執行緒池的建立方法總共有 7 種,但總體來說可分為 2 類:

  • 一類是通過 ThreadPoolExecutor 建立的執行緒池;
  • 另一個類是通過 Executors 建立的執行緒池。

執行緒池的建立方式總共包含以下 7 種(其中 6 種是通過 Executors 建立的,1 種是通過 ThreadPoolExecutor 建立的):

  1. Executors.newFixedThreadPool:建立一個固定大小的執行緒池,可控制併發的執行緒數,超出的執行緒會在佇列中等待;
  2. Executors.newCachedThreadPool:建立一個可快取的執行緒池,若執行緒數超過處理所需,快取一段時間後會回收,若執行緒數不夠,則新建執行緒;
  3. Executors.newSingleThreadExecutor:建立單個執行緒數的執行緒池,它可以保證先進先出的執行順序;
  4. Executors.newScheduledThreadPool:建立一個可以執行延遲任務的執行緒池;
  5. Executors.newSingleThreadScheduledExecutor:建立一個單執行緒的可以執行延遲任務的執行緒池;
  6. Executors.newWorkStealingPool:建立一個搶佔式執行的執行緒池(任務執行順序不確定)【JDK 1.8 新增】。
  7. ThreadPoolExecutor:最原始的建立執行緒池的方式,它包含了 7 個引數可供設定,後面會詳細講。

單執行緒池的意義
從以上程式碼可以看出 newSingleThreadExecutor 和 newSingleThreadScheduledExecutor 建立的都是單執行緒池,那麼單執行緒池的意義是什麼呢?
答:雖然是單執行緒池,但提供了工作佇列,生命週期管理,工作執行緒維護等功能。

那接下來我們來看每種執行緒池建立的具體使用。

1.FixedThreadPool

建立一個固定大小的執行緒池,可控制併發的執行緒數,超出的執行緒會在佇列中等待。

使用示例如下:

public static void fixedThreadPool() {
    // 建立 2 個數據級的執行緒池
    ExecutorService threadPool = Executors.newFixedThreadPool(2);

    // 建立任務
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println("任務被執行,執行緒:" + Thread.currentThread().getName());
        }
    };

    // 執行緒池執行任務(一次新增 4 個任務)
    // 執行任務的方法有兩種:submit 和 execute
    threadPool.submit(runnable);  // 執行方式 1:submit
    threadPool.execute(runnable); // 執行方式 2:execute
    threadPool.execute(runnable);
    threadPool.execute(runnable);
}

執行結果如下:

如果覺得以上方法比較繁瑣,還用更簡單的使用方法,如下程式碼所示:

public static void fixedThreadPool() {
    // 建立執行緒池
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    // 執行任務
    threadPool.execute(() -> {
        System.out.println("任務被執行,執行緒:" + Thread.currentThread().getName());
    });
}

2.CachedThreadPool

建立一個可快取的執行緒池,若執行緒數超過處理所需,快取一段時間後會回收,若執行緒數不夠,則新建執行緒。

使用示例如下:

public static void cachedThreadPool() {
    // 建立執行緒池
    ExecutorService threadPool = Executors.newCachedThreadPool();
    // 執行任務
    for (int i = 0; i < 10; i++) {
        threadPool.execute(() -> {
            System.out.println("任務被執行,執行緒:" + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        });
    }
}

執行結果如下:

從上述結果可以看出,執行緒池建立了 10 個執行緒來執行相應的任務。

3.SingleThreadExecutor

建立單個執行緒數的執行緒池,它可以保證先進先出的執行順序。

使用示例如下:

public static void singleThreadExecutor() {
    // 建立執行緒池
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    // 執行任務
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + ":任務被執行");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        });
    }
}

執行結果如下:

4.ScheduledThreadPool

建立一個可以執行延遲任務的執行緒池。

使用示例如下:

public static void scheduledThreadPool() {
    // 建立執行緒池
    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
    // 新增定時執行任務(1s 後執行)
    System.out.println("新增任務,時間:" + new Date());
    threadPool.schedule(() -> {
        System.out.println("任務被執行,時間:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }, 1, TimeUnit.SECONDS);
}

執行結果如下:

從上述結果可以看出,任務在 1 秒之後被執行了,符合我們的預期。

5.SingleThreadScheduledExecutor

建立一個單執行緒的可以執行延遲任務的執行緒池。

使用示例如下:

public static void SingleThreadScheduledExecutor() {
    // 建立執行緒池
    ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
    // 新增定時執行任務(2s 後執行)
    System.out.println("新增任務,時間:" + new Date());
    threadPool.schedule(() -> {
        System.out.println("任務被執行,時間:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }, 2, TimeUnit.SECONDS);
}

執行結果如下:

從上述結果可以看出,任務在 2 秒之後被執行了,符合我們的預期。

6.newWorkStealingPool

建立一個搶佔式執行的執行緒池(任務執行順序不確定),注意此方法只有在 JDK 1.8+ 版本中才能使用。

使用示例如下:

public static void workStealingPool() {
    // 建立執行緒池
    ExecutorService threadPool = Executors.newWorkStealingPool();
    // 執行任務
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + " 被執行,執行緒名:" + Thread.currentThread().getName());
        });
    }
    // 確保任務執行完成
    while (!threadPool.isTerminated()) {
    }
}

執行結果如下:

從上述結果可以看出,任務的執行順序是不確定的,因為它是搶佔式執行的。

7.ThreadPoolExecutor

最原始的建立執行緒池的方式,它包含了 7 個引數可供設定。

使用示例如下:

public static void myThreadPoolExecutor() {
    // 建立執行緒池
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
    // 執行任務
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + " 被執行,執行緒名:" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

執行結果如下:

ThreadPoolExecutor 引數介紹

ThreadPoolExecutor 最多可以設定 7 個引數,如下程式碼所示:

 public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {
     // 省略...
 }

7 個引數代表的含義如下:

引數 1:corePoolSize

核心執行緒數,執行緒池中始終存活的執行緒數。

引數 2:maximumPoolSize

最大執行緒數,執行緒池中允許的最大執行緒數,當執行緒池的任務佇列滿了之後可以建立的最大執行緒數。

引數 3:keepAliveTime

最大執行緒數可以存活的時間,當執行緒中沒有任務執行時,最大執行緒就會銷燬一部分,最終保持核心執行緒數量的執行緒。

引數 4:unit:

單位是和引數 3 存活時間配合使用的,合在一起用於設定執行緒的存活時間 ,引數 keepAliveTime 的時間單位有以下 7 種可選:

  • TimeUnit.DAYS:天
  • TimeUnit.HOURS:小時
  • TimeUnit.MINUTES:分
  • TimeUnit.SECONDS:秒
  • TimeUnit.MILLISECONDS:毫秒
  • TimeUnit.MICROSECONDS:微妙
  • TimeUnit.NANOSECONDS:納秒

引數 5:workQueue

一個阻塞佇列,用來儲存執行緒池等待執行的任務,均為執行緒安全,它包含以下 7 種類型:

  • ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。
  • LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。
  • SynchronousQueue:一個不儲存元素的阻塞佇列,即直接提交給執行緒不保持它們。
  • PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。
  • DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列,只有在延遲期滿時才能從中提取元素。
  • LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。與SynchronousQueue類似,還含有非阻塞方法。
  • LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

較常用的是 LinkedBlockingQueueSynchronous,執行緒池的排隊策略與 BlockingQueue 有關。

引數 6:threadFactory

執行緒工廠,主要用來建立執行緒,預設為正常優先順序、非守護執行緒。

引數 7:handler

拒絕策略,拒絕處理任務時的策略,系統提供了 4 種可選:

  • AbortPolicy:拒絕並丟擲異常。
  • CallerRunsPolicy:使用當前呼叫的執行緒來執行此任務。
  • DiscardOldestPolicy:拋棄佇列頭部(最舊)的一個任務,並執行當前任務。
  • DiscardPolicy:忽略並拋棄當前任務。

預設策略為 AbortPolicy

執行緒池的執行流程

ThreadPoolExecutor 關鍵節點的執行流程如下:

  • 當執行緒數小於核心執行緒數時,建立執行緒。
  • 當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列。
  • 當執行緒數大於等於核心執行緒數,且任務佇列已滿:若執行緒數小於最大執行緒數,建立執行緒;若執行緒數等於最大執行緒數,丟擲異常,拒絕任務。

執行緒池的執行流程如下圖所示:

執行緒拒絕策略

我們來演示一下 ThreadPoolExecutor 的拒絕策略的觸發,我們使用 DiscardPolicy 的拒絕策略,它會忽略並拋棄當前任務的策略,實現程式碼如下:

public static void main(String[] args) {
    // 任務的具體方法
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println("當前任務被執行,執行時間:" + new Date() +
                               " 執行執行緒:" + Thread.currentThread().getName());
            try {
                // 等待 1s
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    // 建立執行緒,執行緒的任務佇列的長度為 1
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1,
                                                           100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1),
                                                           new ThreadPoolExecutor.DiscardPolicy());
    // 新增並執行 4 個任務
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
}

我們建立了一個核心執行緒數和最大執行緒數都為 1 的執行緒池,並且給執行緒池的任務佇列設定為 1,這樣當我們有 2 個以上的任務時就會觸發拒絕策略,執行的結果如下圖所示:

從上述結果可以看出只有兩個任務被正確執行了,其他多餘的任務就被捨棄並忽略了。其他拒絕策略的使用類似,這裡就不一一贅述了。

自定義拒絕策略

除了 Java 自身提供的 4 種拒絕策略之外,我們也可以自定義拒絕策略,示例程式碼如下:

public static void main(String[] args) {
    // 任務的具體方法
    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println("當前任務被執行,執行時間:" + new Date() +
                               " 執行執行緒:" + Thread.currentThread().getName());
            try {
                // 等待 1s
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    // 建立執行緒,執行緒的任務佇列的長度為 1
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1,
                                                           100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1),
                                                           new RejectedExecutionHandler() {
                                                               @Override
                                                               public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                                                   // 執行自定義拒絕策略的相關操作
                                                                   System.out.println("我是自定義拒絕策略~");
                                                               }
                                                           });
    // 新增並執行 4 個任務
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
    threadPool.execute(runnable);
}

程式的執行結果如下:

究竟選用哪種執行緒池?

經過以上的學習我們對整個執行緒池也有了一定的認識了,那究竟該如何選擇執行緒池呢?

我們來看下阿里巴巴《Java開發手冊》給我們的答案:

【強制要求】執行緒池不允許使用 Executors 去建立,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。

說明:Executors 返回的執行緒池物件的弊端如下:

1) FixedThreadPool 和 SingleThreadPool:允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

2)CachedThreadPool:允許的建立執行緒數量為 Integer.MAX_VALUE,可能會建立大量的執行緒,從而導致 OOM。

所以綜上情況所述,我們推薦使用 ThreadPoolExecutor的方式進行執行緒池的建立,因為這種建立方式更可控,並且更加明確了執行緒池的執行規則,可以規避一些未知的風險。

總結

本文我們介紹了執行緒池的 7 種建立方式,其中最推薦使用的是 ThreadPoolExecutor 的方式進行執行緒池的建立,ThreadPoolExecutor 最多可以設定 7 個引數,當然設定 5 個引數也可以正常使用,ThreadPoolExecutor 當任務過多(處理不過來)時提供了 4 種拒絕策略,當然我們也可以自定義拒絕策略,希望本文的內容能幫助到你。原創不易,覺得不錯就點個贊再走吧!

參考 & 鳴謝

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

https://www.cnblogs.com/pcheng/p/13540619.html

關注公眾號「Java中文社群」發現更多幹貨。