1. 程式人生 > 實用技巧 >Java中的執行緒池

Java中的執行緒池

執行緒池

java.util.concurrent.ExecutorService 表述了非同步執行的機制,並且可以讓任務在後臺執行。ExecutorService是一個介面,繼承了Executor介面。Executor介面只包含了一個方法:void execute(Runnable command);,該方法接收一個 Runable 例項,它用來執行一個任務,任務即一個實現了 Runnable 介面的類。jdk預設提供了四種執行緒池供我們使用。

大體的繼承結構如下:

一、執行緒池的狀態

ThreadPoolExecutor 使用 int 的高 3 位來表示執行緒池狀態,低 29 位表示執行緒數量

狀態名 高三位 接受新任務 處理阻塞佇列任務 說明
RUNNING 111 執行緒池處在RUNNING狀態時,能夠接收新任務,以及對已新增的任務進行處理。
SHUTDOWN 000 不會接收新任務,但會處理阻塞佇列剩餘任務(呼叫shutdown()方法)
STOP 001 會中斷正在執行的任務,並拋棄阻塞佇列任務(呼叫shutdownNow()方法)
TIDYING 010 - - 任務全執行完畢,活動執行緒為 0 即將進入終結
TERMINATED 011 - - 終結狀態

從數字上比較,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,因為最高位表示符號位,這些資訊儲存在一個原子變數 ctl 中,目的是將執行緒池狀態與執行緒個數合二為一,這樣就可以用一次 cas 原子操作進行賦值。

狀態轉換圖如下:

二、ThreadPoolExecutor構造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
  • corePoolSize: 核心執行緒數目 (最多保留的執行緒數),執行緒池中會維護一個最小的執行緒數量,即使這些執行緒處理空閒狀態,他們也不會 被銷燬,除非設定了allowCoreThreadTimeOut。這裡的最小執行緒數量即是corePoolSize。

  • maximumPoolSize :最大執行緒數目,一個任務被提交到執行緒池以後,首先會找有沒有空閒存活執行緒,如果有則直接執行,如果沒有則會快取到工作佇列中,如果工作佇列滿了,才會建立一個新執行緒(這就是救急執行緒),然後從工作佇列的頭部取出一個任務交由新執行緒來處理,而將剛提交的任務放入工作佇列尾部。執行緒池不會無限制的去建立救急執行緒,它會有一個最大執行緒數量的限制,這個數量即由maximunPoolSize的數量減去corePoolSize的數量來確定,最多能達到maximunPoolSize即最大執行緒池執行緒數量。

  • keepAliveTime :生存時間 - 針對救急執行緒

  • unit: 時間單位 - 針對救急執行緒

  • workQueue :新任務被提交後,如果沒有空閒的核心執行緒就會先進入到此工作佇列中,任務排程時再從佇列中取出任務。jdk中常見的任務佇列:

    ①ArrayBlockingQueue

    基於陣列的有界阻塞佇列,按FIFO排序。有界的陣列可以防止資源耗盡問題。當執行緒池中執行緒數量達到corePoolSize後,再有新任務進來,則會將任務放入該佇列的隊尾,等待被排程。如果佇列已經是滿的,則建立一個新執行緒,如果執行緒數量已經達到maxPoolSize,則會執行拒絕策略。

    ②LinkedBlockingQuene

    基於連結串列的無界阻塞佇列(其實最大容量為Interger.MAX),按照FIFO排序。由於該佇列的近似無界性,當執行緒池中執行緒數量達到corePoolSize後,再有新任務進來,會一直存入該佇列,而不會去建立新執行緒直到maxPoolSize,因此使用該工作佇列時,引數maxPoolSize其實是不起作用的。

    ③SynchronousQuene

    一個不快取任務的阻塞佇列,生產者放入一個任務必須等到消費者取出這個任務。也就是說新任務進來時,不會快取,而是直接被排程執行該任務,如果沒有可用執行緒,則建立新執行緒,如果執行緒數量達到maxPoolSize,則執行拒絕策略。

    ④PriorityBlockingQueue

    具有優先順序的無界阻塞佇列,優先順序通過引數Comparator實現。

  • threadFactory :執行緒工廠 - 可以為執行緒建立時起個好名字

  • handler: 拒絕策略

    ​ 如果執行緒到達 maximumPoolSize 仍然有新任務這時會執行拒絕策略。拒絕策略 jdk 提供了 4 種實現:

    • AbortPolicy策略:丟擲 RejectedExecutionException 異常,這是預設策略

    • CallerRunsPolicy策略: 在呼叫者執行緒中直接執行被拒絕任務的run方法,除非執行緒池已經shutdown,則直接拋棄任務。

    • DiscardPolicy策略: 放棄本次任務

    • DiscardOldestPolicy策略: 放棄佇列中最早的任務,本任務取而代之

三、四種執行緒池

Executors提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService介面。

  1. public static ExecutorService newFixedThreadPool(int nThreads) : 建立固定數目執行緒的執行緒池。

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    特點:

    • 核心執行緒數 == 最大執行緒數(沒有救急執行緒被建立),因此也無需超時時間。
    • 阻塞佇列是無界的,可以放任意數量的任務
    • 適用於任務量已知,相對耗時的任務
  2. public static ExecutorService newCachedThreadPool():建立一個可快取的執行緒池,呼叫execute將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新線 程並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。

     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    特點:

    • 核心執行緒數是 0, 最大執行緒數是 Integer.MAX_VALUE,救急執行緒的空閒生存時間是 60s,意味著全部都是救急執行緒(60s 後可以回收)

    • 佇列採用了 SynchronousQueue 實現特點是,它沒有容量,沒有執行緒來取是放不進去的(一手交錢、一手交貨)

    • 救急執行緒可以無限建立。

    • 整個執行緒池表現為執行緒數會根據任務量不斷增長,沒有上限,當任務執行完畢,空閒 1分鐘後釋放線

      程。 適合任務數比較密集,但每個任務執行時間較短的情況

  3. public static ExecutorService newSingleThreadExecutor(): 建立一個單執行緒化的Executor。

      public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    特點:

    • 自己建立一個單執行緒序列執行任務,如果任務執行失敗而終止那麼沒有任何補救措施,而執行緒池還會新建一個執行緒,保證池的正常工作

    • 相較於其它三種執行緒池,newSingleThreadExecutor()建立的是FinalizableDelegatedExecutorService物件,其應用的是裝飾者模式 應用的是裝飾器模式,只對外暴露了 ExecutorService 介面,因此不能呼叫 ThreadPoolExecutor 中特有的方法。FinalizableDelegatedExecutorService繼承自DelegatedExecutorService中的方法都是呼叫ExecutorService 介面的方法

    • 適用於希望多個任務排隊執行。執行緒數固定為 1,任務數多於 1 時,會放入無界佇列排隊。任務執行完畢,這唯一的執行緒也不會被釋放。

  4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。

     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

    ScheduledExecutorService介面的方法

    /**
    *指定延遲時間後執行
    */
    public ScheduledFuture<?> schedule(Runnable command,
                                           long delay, TimeUnit unit);
    
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
    
    /**
    *scheduleAtFixedRate ,是以上一個任務開始的時間計時,period時間過去後,檢測上一個任務是否執行完畢,如果上一個任務執行完*
    畢,則當前任務立即執行,如果上一個任務沒有執行完畢,則需要等上一個任務執行完畢後立即執行。
    */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    /**
    *scheduleWithFixedDelay,是以上一個任務結束時開始計時,period時間過去後,立即執行。
    */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    

四、ExecutorService的常用方法

// 執行任務
void execute(Runnable command);

// 提交任務 task,用返回值 Future 獲得任務執行結果
<T> Future<T> submit(Callable<T> task);

// 提交 tasks 中所有任務,會阻塞,必須等待所有的任務執行完成後統一返回結果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

// 提交 tasks 中所有任務,帶超時時間,這裡的超時時間是針對的所有tasks,而不是單個task的超時時間。同樣會阻塞,會堵塞,必須等待所有的任務執行完成後統一返回。但是如果超時,會取消沒有執行完的所有任務,並丟擲超時異常
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,會阻塞,直到返回結果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,帶超時時間,直到返回結果,取消沒有執行完的所有任務,並丟擲超時異常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;


/*
執行緒池狀態變為 SHUTDOWN
- 不會接收新任務
- 但已提交任務會執行完
- 不會中斷當前執行的執行緒
- 此方法不會阻塞呼叫執行緒的執行
*/
void shutdown();


/*
執行緒池狀態變為 STOP
- 不會接收新任務
- 會將佇列中的任務返回
- 並用 interrupt 的方式中斷正在執行的任務
*/
List<Runnable> shutdownNow()
    
// 不在 RUNNING 狀態的執行緒池,此方法就返回 true
boolean isShutdown();


// 執行緒池狀態是否是 TERMINATED
boolean isTerminated();


// 呼叫 shutdown 後,由於呼叫執行緒並不會等待所有任務執行結束,因此如果它想線上程池 TERMINATED 後做些事
//情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;