1. 程式人生 > >池化技術——自定義執行緒池

池化技術——自定義執行緒池

目錄

  • 池化技術——自定義執行緒池
    • 1、為什麼要使用執行緒池?
      • 1.1、池化技術的特點:
      • 1.2、執行緒池的好處:
      • 1.3、如何自定義一個執行緒池
    • 2、三大方法
      • 2.1、單個執行緒的執行緒池方法
      • 2.2、固定的執行緒池的大小的方法
      • 2.3、可伸縮的執行緒池的方法
      • 2.4、完整的測試程式碼為:
    • 3、為什麼要自定義執行緒池?三大方法建立執行緒池的弊端分析
    • 4、七大引數
    • 5、如何手動的去建立一個執行緒池
    • 6、四種拒絕策略
      • 6.1、會丟擲異常的拒絕策略
      • 6.2、哪來的去哪裡拒絕策略
      • 6.3、丟掉任務拒絕策略
      • 6.4、嘗試競爭拒絕策略
    • 7、關於最大執行緒數應該如何確定
      • 7.1、CPU密集型
      • 7.2、IO密集型
      • 7.3、公式總結

池化技術——自定義執行緒池

1、為什麼要使用執行緒池?

池化技術

1.1、池化技術的特點:

  1. 程式的執行,本質:佔用系統的資源! 優化資源的使用!=>池化技術

  2. 執行緒池、連線池、記憶體池、物件池///..... 建立、銷燬。十分浪費資源

  3. 池化技術:事先準備好一些資源,有人要用,就來我這裡拿,用完之後還給我。

1.2、執行緒池的好處:

  1. 降低資源的消耗
  2. 降低資源的消耗
  3. 方便管理。

核心:==執行緒複用、可以控制最大併發數、管理執行緒==

1.3、如何自定義一個執行緒池

牢記:==三大方法、7大引數、4種拒絕策略==

2、三大方法

三大方法

在java的JDK中提夠了Executors開啟JDK預設執行緒池的類,其中有三個方法可以用來開啟執行緒池。

2.1、單個執行緒的執行緒池方法

ExecutorService threadPool = Executors.newSingleThreadExecutor();    //單個執行緒的執行緒池

該方法開啟的執行緒池,故名思義該池中只有一個執行緒。

2.2、固定的執行緒池的大小的方法

ExecutorService threadPool = Executors.newFixedThreadPool(5);    //固定的執行緒池的大小

其中方法中傳遞的int型別的引數,就是池中的執行緒數量

2.3、可伸縮的執行緒池的方法

ExecutorService threadPool = Executors.newCachedThreadPool();        //可伸縮

該方法建立的執行緒池是不固定大小的,可以根據需求動態的在池子裡建立執行緒,遇強則強。

2.4、完整的測試程式碼為:

package com.xgp.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 工具類,三大方法
 */
public class Demo01 {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();    //單個執行緒的執行緒池
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);    //固定的執行緒池的大小
//        ExecutorService threadPool = Executors.newCachedThreadPool();        //可伸縮

       try{
           for(int i = 0;i < 10;i++) {
               //使用執行緒池去建立
               threadPool.execute(() -> {
                   System.out.println(Thread.currentThread().getName() + " OK");
               });
           }
       }catch (Exception e) {
           e.printStackTrace();
       }finally {
           //關閉執行緒池
           threadPool.shutdown();
       }
    }
}

將三行註釋部分依次開啟的執行結果為:

pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK

上述執行結果為單個執行緒的執行緒池的結果:可以看出的確只有一條執行緒在執行。

pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

上述執行結果為固定執行緒的執行緒池的結果:因為固定的大小為5,可以看出的確有5條執行緒在執行。

pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
pool-1-thread-7 OK
pool-1-thread-9 OK
pool-1-thread-10 OK
pool-1-thread-8 OK
pool-1-thread-6 OK

上述執行結果為彈性執行緒的執行緒池的結果:可以看出的確有10條執行緒在執行。

3、為什麼要自定義執行緒池?三大方法建立執行緒池的弊端分析

  1. 在單個執行緒池和固定大小的執行緒池中,因為處理的執行緒有限,當大量的請求進來時,都會在阻塞佇列中等候,而允許請求的佇列長度為Integet.MAX_VALUE,整數的最大值約為21億,會導致JVM記憶體溢位。

  2. 在彈性伸縮的執行緒池中,允許建立的執行緒數量為Integet.MAX_VALUE,可能會建立大量的執行緒,使得Jvm記憶體溢位。

    ==對於上述的兩點,其數值會在後面分析原始碼的環節看到,關於這一點,在阿里巴巴開發手冊中有著詳細的說明,並極力推薦採用自定義執行緒池,而不使用這三大方法。==

4、七大引數

七大引數

原始碼分析:我們要指定義自己的執行緒池,先從原始碼的角度看一看JDK現有的三個執行緒池是如何編寫的。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
          (new ThreadPoolExecutor(1, 1,
                                 0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

從三大方法的原始碼中可以看出,三種執行緒池都是new 了一個 ThreadPoolExecutor 物件,點選原始碼中看看。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

這裡呼叫了一個 this() 方法,在點選進去一看。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

到這裡就可以很明顯的看出本節想要講述的7大引數,是哪7大了吧。根據英文意思,可以很容易的說明這七大引數的意思了。

    public ThreadPoolExecutor(int corePoolSize,   //核心執行緒數
                              int maximumPoolSize,  //最大執行緒數
                              long keepAliveTime,   //超時等待
                              TimeUnit unit,        //超時等待的單位
                              BlockingQueue<Runnable> workQueue,    //阻塞佇列
                              ThreadFactory threadFactory,      //執行緒池工廠
                              RejectedExecutionHandler handler) {     //拒絕策略

在阿里巴巴開發手冊中,推薦的也是使用 ThreadPoolExecutor 來進行建立執行緒池的。

這裡可以用一張在銀行辦理業務的圖來生動的說明這七大引數。

這裡,解釋下這張圖對應的含義:

  1. 銀行在人很少的時候也只開放兩個視窗,並且什麼時候都不會進行關閉。——核心執行緒數

  2. 當人數大於2又小於5人時,後來的三人就在候客區等候辦理業務。——阻塞佇列

  3. 當人數大於5人又小於8人時,另外三個正在關閉的視窗需要開放進行辦理業務,於是乎就有了5個視窗在進行辦理業務。——最大執行緒數

  4. 將開啟其他三個視窗,需要領導將這三個視窗的員工叫回。——執行緒池工廠

  5. 當人數實在太多時,銀行都擠不下了,此時就會把門關了,不接受新的服務了。——拒絕策略

  6. 當銀行人數又變少時,另外的三個非核心視窗太久沒又生意,為了節約成本,則又會進行關閉。——超時等待

    通過對上述7大引數的分析,同學們也能夠更加理解JDK自帶的三大方法的弊端,以及為什麼是整數的最大值的這個數值。

5、如何手動的去建立一個執行緒池

手動建立連線池

七大引數的說明也都講了,於是乎我們可以仿照這七大引數,來定義一個自己的執行緒池。對於其中的執行緒工廠,我們也一般採用預設的工廠,而其中的拒絕策略我們可以通過原始碼分析,先使用三大方法使用的拒絕策略。

點選進入 defaultHandler 的原始碼中可以看到。

    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

其中的 new AbortPolicy(); 就是三大方法使用的拒絕策略,我們先仿照銀行的例子,自定義一個執行緒池。程式碼如下:

package com.xgp.pool;

import java.util.concurrent.*;

/**
 * 自定義執行緒池
 */
public class Demo02 {
/*    public ThreadPoolExecutor(int corePoolSize,   //核心執行緒數
                              int maximumPoolSize,  //最大執行緒數
                              long keepAliveTime,   //超時等待
                              TimeUnit unit,        //超時等待的單位
                              BlockingQueue<Runnable> workQueue,    //阻塞佇列
                              ThreadFactory threadFactory,      //執行緒池工廠
                              RejectedExecutionHandler handler) {*/     //拒絕策略
    public static void main(String[] args) {
        ExecutorService pool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()    //會丟擲異常的實現類
//                new ThreadPoolExecutor.CallerRunsPolicy()  //哪來的去哪裡
//                new ThreadPoolExecutor.DiscardOldestPolicy()    //不會丟擲異常,會丟掉任務
//                new ThreadPoolExecutor.AbortPolicy()        //嘗試會和第一個競爭
                );


        try{
            for(int i = 0;i < 8;i++) {
                //使用執行緒池去建立
                pool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " OK");
                });
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            //關閉執行緒池
            pool.shutdown();
        }
    }
}

於是乎,我們完成了一個自定義的執行緒池,核心執行緒數為2,最大執行緒數為5,超時等待的秒數為3s,阻塞佇列的長度為3。

6、四種拒絕策略

四種拒絕策略

通過分析原始碼,可以知道三大方法預設的拒絕策略在ThreadPoolExecutor這個類中,由於該類較為複雜,尋找起來不方便,於是我們可以採用IDEA的程式碼提示功能,非常明顯的提示出了四種拒絕策略,也就是上面自定義執行緒池中的被註釋部分。

將上面自定義執行緒池的程式碼註釋一一開啟,我們來進行測試:

6.1、會丟擲異常的拒絕策略

                new ThreadPoolExecutor.AbortPolicy()    //會丟擲異常的實現類

該拒絕策略執行的結果為:

pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-5 OK
java.util.concurrent.RejectedExecutionException: Task com.xgp.pool.Demo02$$Lambda$1/2093631819@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 4]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.xgp.pool.Demo02.main(Demo02.java:40)

該策略就是當最大執行緒數+阻塞佇列數都不滿足請求數時,系統將丟擲異常來進行解決。

6.2、哪來的去哪裡拒絕策略

                new ThreadPoolExecutor.CallerRunsPolicy()  //哪來的去哪裡

該拒絕策略執行的結果為:

pool-1-thread-1 OK
main OK
main OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

可以看出,該拒絕策略當執行緒池的執行緒數不能夠滿足需求時,會將不能服務的任務打道回府,即交給main執行緒來解決,該拒絕策略適用於原來的執行緒能夠解決問題的情況。

6.3、丟掉任務拒絕策略

                new ThreadPoolExecutor.DiscardOldestPolicy()    //不會丟擲異常,會丟掉任務

該拒絕策略執行的結果為:

pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

數一數,一共是10個任務,但根據執行的情況只處理了8個任務,該拒絕策略將不能夠分配執行緒執行的任務全部丟棄了,會造成資料的丟失。

6.4、嘗試競爭拒絕策略

                new ThreadPoolExecutor.DiscardPolicy()      //嘗試會和第一個競爭

該拒絕策略執行的結果為:

pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-4 OK
pool-1-thread-5 OK

數一數,一共是10個任務,但根據執行的情況只處理了9個任務,其中競爭成功了一個,失敗了一個。該策略將會於最早進來的執行緒進行競爭,類似於作業系統中的搶佔式短作業優先演算法,該拒絕策略同樣會造成資料的丟失。

7、關於最大執行緒數應該如何確定

7.1、CPU密集型

CPU密集型

對於有多核Cpu的電腦,應該讓cpu充分忙碌起來,不要低於Cpu的核數,並且不應該在程式碼中寫死,而是應該能夠自動的獲取當前機器的Cpu核數,獲取的程式碼如下:

System.out.println(Runtime.getRuntime().availableProcessors());

7.2、IO密集型

IO密集型

對於系統中有大量IO任務時,應該要預留出足夠的執行緒來處理IO任務,因為IO任務極度耗時。如果判斷出系統中的IO密集的任務有10個,則定義的執行緒數量需要大於10。

7.3、公式總結

最大執行緒數   =   機器核素*2  +   IO密集型任務數

對於上述該公式,只是網上的一種總結,作者也沒有進行深入的測試於瞭解,讀者應該根據自己的業務需要進行合理的調