Java中的執行緒池
執行緒池
java.util.concurrent.ExecutorService 表述了非同步執行的機制,並且可以讓任務在後臺執行。ExecutorService是一個介面,繼承了Executor介面。Executor介面只包含了一個方法:void execute(Runnable command);
,該方法接收一個 Runable 例項,它用來執行一個任務,任務即一個實現了 Runnable 介面的類。jdk預設提供了四種執行緒池供我們使用。
大體的繼承結構如下:
一、執行緒池的狀態
ThreadPoolExecutor 使用 int 的高 3 位來表示執行緒池狀態,低 29 位表示執行緒數量
狀態名 | 高三位 | 接受新任務 | 處理阻塞佇列任務 | 說明 |
---|---|---|---|---|
RUNNING | 111 | 是 | 是 | 執行緒池處在RUNNING狀態時,能夠接收新任務,以及對已新增的任務進行處理。 |
SHUTDOWN | 000 | 否 | 是 | 不會接收新任務,但會處理阻塞佇列剩餘任務(呼叫shutdown()方法) |
STOP | 001 | 否 | 否 | 會中斷正在執行的任務,並拋棄阻塞佇列任務(呼叫shutdownNow()方法) |
TIDYING | 010 | - | - | 任務全執行完畢,活動執行緒為 0 即將進入終結 |
TERMINATED | 011 | - | - | 終結狀態 |
從數字上比較,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,因為最高位表示符號位,這些資訊儲存在一個原子變數 ctl 中,目的是將執行緒池狀態與執行緒個數合二為一,這樣就可以用一次 cas 原子操作進行賦值。
狀態轉換圖如下:
二、ThreadPoolExecutor構造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
-
corePoolSize: 核心執行緒數目 (最多保留的執行緒數),執行緒池中會維護一個最小的執行緒數量,即使這些執行緒處理空閒狀態,他們也不會 被銷燬,除非設定了allowCoreThreadTimeOut。這裡的最小執行緒數量即是corePoolSize。
-
maximumPoolSize :最大執行緒數目,一個任務被提交到執行緒池以後,首先會找有沒有空閒存活執行緒,如果有則直接執行,如果沒有則會快取到工作佇列中,如果工作佇列滿了,才會建立一個新執行緒(這就是救急執行緒),然後從工作佇列的頭部取出一個任務交由新執行緒來處理,而將剛提交的任務放入工作佇列尾部。執行緒池不會無限制的去建立救急執行緒,它會有一個最大執行緒數量的限制,這個數量即由maximunPoolSize的數量減去corePoolSize的數量來確定,最多能達到maximunPoolSize即最大執行緒池執行緒數量。
-
keepAliveTime :生存時間 - 針對救急執行緒
-
unit: 時間單位 - 針對救急執行緒
-
workQueue :新任務被提交後,如果沒有空閒的核心執行緒就會先進入到此工作佇列中,任務排程時再從佇列中取出任務。jdk中常見的任務佇列:
①ArrayBlockingQueue
基於陣列的有界阻塞佇列,按FIFO排序。有界的陣列可以防止資源耗盡問題。當執行緒池中執行緒數量達到corePoolSize後,再有新任務進來,則會將任務放入該佇列的隊尾,等待被排程。如果佇列已經是滿的,則建立一個新執行緒,如果執行緒數量已經達到maxPoolSize,則會執行拒絕策略。
②LinkedBlockingQuene
基於連結串列的無界阻塞佇列(其實最大容量為Interger.MAX),按照FIFO排序。由於該佇列的近似無界性,當執行緒池中執行緒數量達到corePoolSize後,再有新任務進來,會一直存入該佇列,而不會去建立新執行緒直到maxPoolSize,因此使用該工作佇列時,引數maxPoolSize其實是不起作用的。
③SynchronousQuene
一個不快取任務的阻塞佇列,生產者放入一個任務必須等到消費者取出這個任務。也就是說新任務進來時,不會快取,而是直接被排程執行該任務,如果沒有可用執行緒,則建立新執行緒,如果執行緒數量達到maxPoolSize,則執行拒絕策略。
④PriorityBlockingQueue
具有優先順序的無界阻塞佇列,優先順序通過引數Comparator實現。
-
threadFactory :執行緒工廠 - 可以為執行緒建立時起個好名字
-
handler: 拒絕策略
如果執行緒到達 maximumPoolSize 仍然有新任務這時會執行拒絕策略。拒絕策略 jdk 提供了 4 種實現:
-
AbortPolicy策略:丟擲 RejectedExecutionException 異常,這是預設策略
-
CallerRunsPolicy策略: 在呼叫者執行緒中直接執行被拒絕任務的run方法,除非執行緒池已經shutdown,則直接拋棄任務。
-
DiscardPolicy策略: 放棄本次任務
-
DiscardOldestPolicy策略: 放棄佇列中最早的任務,本任務取而代之
-
三、四種執行緒池
Executors提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService介面。
-
public static ExecutorService newFixedThreadPool(int nThreads) : 建立固定數目執行緒的執行緒池。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
特點:
- 核心執行緒數 == 最大執行緒數(沒有救急執行緒被建立),因此也無需超時時間。
- 阻塞佇列是無界的,可以放任意數量的任務
- 適用於任務量已知,相對耗時的任務
-
public static ExecutorService newCachedThreadPool():建立一個可快取的執行緒池,呼叫execute將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新線 程並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
特點:
-
核心執行緒數是 0, 最大執行緒數是 Integer.MAX_VALUE,救急執行緒的空閒生存時間是 60s,意味著全部都是救急執行緒(60s 後可以回收)
-
佇列採用了 SynchronousQueue 實現特點是,它沒有容量,沒有執行緒來取是放不進去的(一手交錢、一手交貨)
-
救急執行緒可以無限建立。
-
整個執行緒池表現為執行緒數會根據任務量不斷增長,沒有上限,當任務執行完畢,空閒 1分鐘後釋放線
程。 適合任務數比較密集,但每個任務執行時間較短的情況
-
-
public static ExecutorService newSingleThreadExecutor(): 建立一個單執行緒化的Executor。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
特點:
-
自己建立一個單執行緒序列執行任務,如果任務執行失敗而終止那麼沒有任何補救措施,而執行緒池還會新建一個執行緒,保證池的正常工作
-
相較於其它三種執行緒池,newSingleThreadExecutor()建立的是FinalizableDelegatedExecutorService物件,其應用的是裝飾者模式 應用的是裝飾器模式,只對外暴露了 ExecutorService 介面,因此不能呼叫 ThreadPoolExecutor 中特有的方法。FinalizableDelegatedExecutorService繼承自DelegatedExecutorService中的方法都是呼叫ExecutorService 介面的方法
-
適用於希望多個任務排隊執行。執行緒數固定為 1,任務數多於 1 時,會放入無界佇列排隊。任務執行完畢,這唯一的執行緒也不會被釋放。
-
-
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledExecutorService介面的方法
/** *指定延遲時間後執行 */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** *scheduleAtFixedRate ,是以上一個任務開始的時間計時,period時間過去後,檢測上一個任務是否執行完畢,如果上一個任務執行完* 畢,則當前任務立即執行,如果上一個任務沒有執行完畢,則需要等上一個任務執行完畢後立即執行。 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** *scheduleWithFixedDelay,是以上一個任務結束時開始計時,period時間過去後,立即執行。 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
四、ExecutorService的常用方法
// 執行任務
void execute(Runnable command);
// 提交任務 task,用返回值 Future 獲得任務執行結果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任務,會阻塞,必須等待所有的任務執行完成後統一返回結果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
// 提交 tasks 中所有任務,帶超時時間,這裡的超時時間是針對的所有tasks,而不是單個task的超時時間。同樣會阻塞,會堵塞,必須等待所有的任務執行完成後統一返回。但是如果超時,會取消沒有執行完的所有任務,並丟擲超時異常
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,會阻塞,直到返回結果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,帶超時時間,直到返回結果,取消沒有執行完的所有任務,並丟擲超時異常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
/*
執行緒池狀態變為 SHUTDOWN
- 不會接收新任務
- 但已提交任務會執行完
- 不會中斷當前執行的執行緒
- 此方法不會阻塞呼叫執行緒的執行
*/
void shutdown();
/*
執行緒池狀態變為 STOP
- 不會接收新任務
- 會將佇列中的任務返回
- 並用 interrupt 的方式中斷正在執行的任務
*/
List<Runnable> shutdownNow()
// 不在 RUNNING 狀態的執行緒池,此方法就返回 true
boolean isShutdown();
// 執行緒池狀態是否是 TERMINATED
boolean isTerminated();
// 呼叫 shutdown 後,由於呼叫執行緒並不會等待所有任務執行結束,因此如果它想線上程池 TERMINATED 後做些事
//情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;