1. 程式人生 > 其它 >java四種執行緒池的使用

java四種執行緒池的使用

一、四種執行緒池

Java通過Executors提供四種執行緒池,分別為

  1. newSingleThreadExecutor建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
  2. newFixedThreadPool建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
  3. newScheduledThreadPool建立一個可定期或者延時執行任務的定長執行緒池,支援定時及週期性任務執行。
  4. newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。

二、核心類

四種執行緒池本質都是建立ThreadPoolExecutor類,ThreadPoolExecutor構造引數如下

  1. int corePoolSize, 核心執行緒大小

  2. int maximumPoolSize,最大執行緒大小

  3. long keepAliveTime, 超過corePoolSize的執行緒多久不活動被銷燬時間

  4. TimeUnit unit,時間單位

  5. BlockingQueue<Runnable>workQueue 任務佇列

  6. ThreadFactory threadFactory 執行緒池工廠

  7. RejectedExecutionHandler handler 拒絕策略

三、阻塞佇列

  1. ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列

  2. LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列(常用)

  3. PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列

  4. DelayQueue: 一個使用優先順序佇列實現的無界阻塞佇列

  5. SynchronousQueue: 一個不儲存元素的阻塞佇列(常用)

  6. LinkedTransferQueue: 一個由連結串列結構組成的無界阻塞佇列

  7. LinkedBlockingDeque: 一個由連結串列結構組成的雙向阻塞佇列

四、四種預設執行緒池原始碼初步分析

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

//建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}
  • 檢視原始碼,四種執行緒池都是建立了ThreadPollExecutor物件,只是傳遞的引數不一樣而已,觀察傳入的workQueue 都是預設,即最大可新增Integer.MAX_VALUE個任務,這也就是阿里巴巴java開發規範禁止直接使用java提供的預設執行緒池的原因了

五、執行緒池任務執行流程

  1. 當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。

  2. 當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行

  3. 當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務

  4. 當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理

  5. 當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,釋放空閒執行緒

  6. 當設定allowCoreThreadTimeOut(true)時,該引數預設false,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉

六、使用場景詳解

newCachedThreadPool:

  • 底層:返回ThreadPoolExecutor例項,corePoolSize為0;maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為60L;時間單位TimeUnit.SECONDS;workQueue為SynchronousQueue(同步佇列)
  • 通俗:當有新任務到來,則插入到SynchronousQueue中,由於SynchronousQueue是同步佇列,因此會在池中尋找可用執行緒來執行,若有可以執行緒則執行,若沒有可用執行緒則建立一個執行緒來執行該任務;若池中執行緒空閒時間超過指定時間,則該執行緒會被銷燬。
  • 適用:執行很多短期的非同步任務
/**
 * 1.建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒<br>
 * 2.當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務<br>
 * 3.此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小<br>
 */
public static void cacheThreadPool() {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i = 1; i <= 10; i++) {
            final int ii = i;
            try {
                Thread.sleep(ii * 1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            cachedThreadPool.execute(()->out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行" + ii));
        }
}
-----output------
執行緒名稱:pool-1-thread-1,執行1
執行緒名稱:pool-1-thread-1,執行2
執行緒名稱:pool-1-thread-1,執行3
執行緒名稱:pool-1-thread-1,執行4
執行緒名稱:pool-1-thread-1,執行5
執行緒名稱:pool-1-thread-1,執行6
執行緒名稱:pool-1-thread-1,執行7
執行緒名稱:pool-1-thread-1,執行8
執行緒名稱:pool-1-thread-1,執行9
執行緒名稱:pool-1-thread-1,執行10

newFixedThreadPool:

  • 底層:返回ThreadPoolExecutor例項,接收引數為所設定執行緒數量n,corePoolSize和maximumPoolSize均為n;keepAliveTime為0L;時間單位TimeUnit.MILLISECONDS;WorkQueue為:new LinkedBlockingQueue<Runnable>()無界阻塞佇列
  • 通俗:建立可容納固定數量執行緒的池子,每個執行緒的存活時間是無限的,當池子滿了就不再新增執行緒了;如果池中的所有執行緒均在繁忙狀態,對於新任務會進入阻塞佇列中(無界的阻塞佇列)
  • 適用:執行長期任務
  • /**
      * 1.建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小<br>
      * 2.執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒<br>
      * 3.因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字,和執行緒名稱<br>
      */
    public static void fixTheadPoolTest() {
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 10; i++) {
                final int ii = i;
                fixedThreadPool.execute(() -> {
                    out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行" + ii);
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
    }
    ------output-------
    執行緒名稱:pool-1-thread-3,執行2
    執行緒名稱:pool-1-thread-1,執行0
    執行緒名稱:pool-1-thread-2,執行3
    執行緒名稱:pool-1-thread-3,執行4
    執行緒名稱:pool-1-thread-1,執行5
    執行緒名稱:pool-1-thread-2,執行6
    執行緒名稱:pool-1-thread-3,執行7
    執行緒名稱:pool-1-thread-1,執行8
    執行緒名稱:pool-1-thread-3,執行9

    newSingleThreadExecutor:

    • 底層:FinalizableDelegatedExecutorService包裝的ThreadPoolExecutor例項,corePoolSize為1;maximumPoolSize為1;keepAliveTime為0L;時間單位TimeUnit.MILLISECONDS;workQueue為:new LinkedBlockingQueue<Runnable>()無解阻塞佇列
    • 通俗:建立只有一個執行緒的執行緒池,當該執行緒正繁忙時,對於新任務會進入阻塞佇列中(無界的阻塞佇列)
    • 適用:按順序執行任務的場景
    • /**  *建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行
       */
      public static void singleTheadPoolTest() {
              ExecutorService pool = Executors.newSingleThreadExecutor();
              for (int i = 0; i < 10; i++) {
                  final int ii = i;
                  pool.execute(() -> out.println(Thread.currentThread().getName() + "=>" + ii));
              }
      }
      
      -----output-------
      執行緒名稱:pool-1-thread-1,執行0
      執行緒名稱:pool-1-thread-1,執行1
      執行緒名稱:pool-1-thread-1,執行2
      執行緒名稱:pool-1-thread-1,執行3
      執行緒名稱:pool-1-thread-1,執行4
      執行緒名稱:pool-1-thread-1,執行5
      執行緒名稱:pool-1-thread-1,執行6
      執行緒名稱:pool-1-thread-1,執行7
      執行緒名稱:pool-1-thread-1,執行8
      執行緒名稱:pool-1-thread-1,執行9

      NewScheduledThreadPool:

      • 底層:建立ScheduledThreadPoolExecutor例項,該物件繼承了ThreadPoolExecutor,corePoolSize為傳遞來的引數,maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為0;時間單位TimeUnit.NANOSECONDS;workQueue為:new DelayedWorkQueue()一個按超時時間升序排序的佇列
      • 通俗:建立一個固定大小的執行緒池,執行緒池內執行緒存活時間無限制,執行緒池可以支援定時及週期性任務執行,如果所有執行緒均處於繁忙狀態,對於新任務會進入DelayedWorkQueue佇列中,這是一種按照超時時間排序的佇列結構
      • 適用:執行週期性任務
      • /**
         * 建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行
         */
        public static void sceduleThreadPool() {
                ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
                Runnable r1 = () -> out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行:3秒後執行");
                scheduledThreadPool.schedule(r1, 3, TimeUnit.SECONDS);
                Runnable r2 = () -> out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行:延遲2秒後每3秒執行一次");
                scheduledThreadPool.scheduleAtFixedRate(r2, 2, 3, TimeUnit.SECONDS);
                Runnable r3 = () -> out.println("執行緒名稱:" + Thread.currentThread().getName() + ",執行:普通任務");
                for (int i = 0; i < 5; i++) {
                    scheduledThreadPool.execute(r3);
                }
        }
        ----output------
        執行緒名稱:pool-1-thread-1,執行:普通任務
        執行緒名稱:pool-1-thread-5,執行:普通任務
        執行緒名稱:pool-1-thread-4,執行:普通任務
        執行緒名稱:pool-1-thread-3,執行:普通任務
        執行緒名稱:pool-1-thread-2,執行:普通任務
        執行緒名稱:pool-1-thread-1,執行:延遲2秒後每3秒執行一次
        執行緒名稱:pool-1-thread-5,執行:3秒後執行
        執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次
        執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次
        執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次
        執行緒名稱:pool-1-thread-4,執行:延遲2秒後每3秒執行一次

        七、備註:

        • 一般如果執行緒池任務佇列採用LinkedBlockingQueue佇列的話,那麼不會拒絕任何任務(因為其大小為Integer.MAX_VALUE),這種情況下,ThreadPoolExecutor最多僅會按照最小執行緒數corePoolSize來建立執行緒,也就是說執行緒池大小被忽略了。
        • 如果執行緒池任務佇列採用ArrayBlockingQueue佇列,初始化設定了最大佇列數。那麼ThreadPoolExecutor的maximumPoolSize才會生效,那麼ThreadPoolExecutor的maximumPoolSize才會生效會採用新的演算法處理任務,
        • 例如假定執行緒池的最小執行緒數為4,最大為8,ArrayBlockingQueue最大為10。隨著任務到達並被放到佇列中,執行緒池中最多執行4個執行緒(即核心執行緒數)直到佇列完全填滿,也就是說等待狀態的任務小於等於10,ThreadPoolExecutor也只會利用4個核心執行緒執行緒處理任務。
        • 如果佇列已滿,而又有新任務進來,此時才會啟動一個新執行緒,這裡不會因為佇列已滿而拒接該任務,相反會啟動一個新執行緒。新執行緒會執行佇列中的第一個任務,為新來的任務騰出空間。如果執行緒數已經等於最大執行緒數,任務佇列也已經滿了,則執行緒池會拒絕這個任務,預設拒絕策略是丟擲異常。
        • 這個演算法背的理念是:該池大部分時間僅使用核心執行緒(4個),即使有適量的任務在佇列中等待執行。這時執行緒池就可以用作節流閥。如果擠壓的請求變得非常多,這時該池就會嘗試執行更多的執行緒來清理;這時第二個節流閥—最大執行緒數就起作用了。