Java執行緒池的執行原理以及使用詳解
一、 為什麼要使用執行緒池
在一些需要使用執行緒去處理任務的業務場景中,如果每一個任務都建立一個執行緒去處理,任務處理完過後,把這個執行緒銷燬,這樣會產生大量的執行緒建立,銷燬的資源開銷。使用執行緒池能有效的控制這種執行緒的建立和銷燬,而且能夠對建立的執行緒進行有效的管理。
二、Java執行緒池相關的API介紹
1. Executor介面
主要是用來執行提交的任務。下面是介面定義:
public interface Executor {
void execute(Runnable command);
}
後面說的執行緒池會實現這個介面,並且會使用這個方法來提交一個任務。
2. ExecutorService介面
ExecutorService介面是Executor介面的一個子介面,它在Executor介面的基礎上增加了一些方法,用來支援對任務的終止管理以及對非同步任務的支援。
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3. AbstractExecutorService 抽象類
AbstractExecutorService實現了ExecutorService,並基於模板方法模式對一些方法給出了實現。是我們接下來要提到的執行緒池類ThreadPoolExecutor的直接父類。程式碼貼出來有點多,這裡就不貼了。
4. ThreadPoolExecutor類
ThreadPoolExecutor通常就是我們所說的執行緒池類,Java的執行緒池就是用過這個類進行建立的。下面分析的執行緒池的執行原理,也是基於這個類來進行分析的。
5. ScheduledExecutorService介面
ScheduledExecutorService介面是ExecutorService子介面,定義了執行緒池基於任務排程的一些方法。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
可以看到,上面定義了延時週期排程,固定頻率週期排程,返回任務結果的任務排程等方法。
6. ScheduledThreadPoolExecutor類
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor類,並且實現了ScheduledExecutorService介面,對任務排程的功能進行了實現。
7. Executors類
Executors可以認為是執行緒池的工廠類,裡面提供了靜態方法對執行緒池進行建立。
下面列出常用的幾種執行緒池建立方法:
//固定執行緒大小的執行緒池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//單個執行緒執行緒池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//無上限執行緒執行緒池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//基於任務排程的執行緒池(還有其他型別的任務排程執行緒池,這裡不列舉了)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
以上就是Java執行緒池相關的API類。
三、Java執行緒池的執行原理
這裡主要介紹Java執行緒池各種情況下,一個任務提交給執行緒池,它線上程池內部是怎麼執行的。在介紹內部執行機制之前,有必要先對執行緒池的一些引數屬性進行介紹。
1. 引數屬性介紹
- 核心執行緒數corePoolSize:核心執行緒池數量。提交一個任務的時候,會對執行緒池裡面的當前存活執行緒數和這個corePoolSize進行比較,不同的情況會有不同的操作。
- 最大執行緒數maximumPoolSize:執行緒池所能建立的執行緒的最大的數量。
- 空閒執行緒的超時時間keepAliveTime:如果執行緒池當前執行緒數是大於corePoolSize,並且這些執行緒中是有空閒執行緒的,也就是說這些執行緒沒有在執行任務,那麼空閒時間超過keepAliveTime時間,這些執行緒會被銷燬,直到當前執行緒數等於corePoolSize,這時即使有空閒執行緒並且超時了也不會進行執行緒銷燬。
- 任務佇列workQueue:這是一個阻塞佇列,用於儲存提交的任務。
- 執行緒工廠threadFactory:執行緒池會使用這個工廠類來建立執行緒,使用者可以自己實現。
- 任務的拒絕處理handler(RejectedExecutionHandler):線上程數已經達到了最大執行緒數,而且任務佇列也滿了過後,提交的任務會使用這個handler來進行處理,使用者也可以自己實現。預設是丟擲一個異常RejectedExecutionException。
2. 執行緒池執行原理分析
上面介紹了執行緒池內部的一些核心屬性,下面會基於這些屬性來介紹,當用戶提交一個任務時,執行緒池內部是如何執行的。
- 建立一個執行緒池,在還沒有任務提交的時候,預設執行緒池裡面是沒有執行緒的。當然,你也可以呼叫prestartCoreThread方法,來預先建立一個核心執行緒。
- 執行緒池裡還沒有執行緒或者執行緒池裡存活的執行緒數小於核心執行緒數corePoolSize時,這時對於一個新提交的任務,執行緒池會建立一個執行緒去處理提交的任務。當執行緒池裡面存活的執行緒數小於等於核心執行緒數corePoolSize時,執行緒池裡面的執行緒會一直存活著,就算空閒時間超過了keepAliveTime,執行緒也不會被銷燬,而是一直阻塞在那裡一直等待任務佇列的任務來執行。
- 當執行緒池裡面存活的執行緒數已經等於corePoolSize了,這是對於一個新提交的任務,會被放進任務佇列workQueue排隊等待執行。而之前建立的執行緒並不會被銷燬,而是不斷的去拿阻塞佇列裡面的任務,當任務佇列為空時,執行緒會阻塞,直到有任務被放進任務佇列,執行緒拿到任務後繼續執行,執行完了過後會繼續去拿任務。這也是為什麼執行緒池佇列要是用阻塞佇列。
- 當執行緒池裡面存活的執行緒數已經等於corePoolSize了,並且任務佇列也滿了,這裡假設maximumPoolSize>corePoolSize(如果等於的話,就直接拒絕了),這時如果再來新的任務,執行緒池就會繼續建立新的執行緒來處理新的任務,知道執行緒數達到maximumPoolSize,就不會再建立了。這些新建立的執行緒執行完了當前任務過後,在任務佇列裡面還有任務的時候也不會銷燬,而是去任務佇列拿任務出來執行。在當前執行緒數大於corePoolSize過後,執行緒執行完當前任務,會有一個判斷當前執行緒是否需要銷燬的邏輯:如果能從任務佇列中拿到任務,那麼繼續執行,如果拿任務時阻塞(說明佇列中沒有任務),那超過keepAliveTime時間就直接返回null並且銷燬當前執行緒,直到執行緒池裡面的執行緒數等於corePoolSize之後才不會進行執行緒銷燬。
- 如果當前的執行緒數達到了maximumPoolSize,並且任務佇列也滿了,這種情況下還有新的任務過來,那就直接採用拒絕的處理器進行處理。預設的處理器邏輯是丟擲一個RejectedExecutionException異常。你也就可以指定其他的處理器,或者自定義一個拒絕處理器來實現拒絕邏輯的處理(比如講這些任務儲存起來)。JDK提供了四種拒絕策略處理類:AbortPolicy(丟擲一個異常,預設的),DiscardPolicy(直接丟棄任務),DiscardOldestPolicy(丟棄佇列裡最老的任務,將當前這個任務繼續提交給執行緒池),CallerRunsPolicy(交給執行緒池呼叫所在的執行緒進行處理)。
3. 常用的幾種執行緒池以及使用場景
這裡主要介紹使用Executors類來建立的幾種執行緒池,及其比較適合的使用場景。
- SingleThreadExecutor:單個執行緒的執行緒池。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
可以看到,執行緒池裡最多一直有個執行緒用來處理任務,並且佇列使用的是無界佇列LinkedBlockingQueue。這種執行緒池主要適用於請求量非常小的場景,或者離線的資料處理等,只需要一個執行緒就夠了。在持續的請求量比較大的情況下,不要使用這種執行緒池,單執行緒處理會使佇列不斷變大,最終可能導致oom(記憶體溢位)。
- FixedThreadPool:固定執行緒大小執行緒池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到corePoolSize和maximumPoolSize是相等的,keepAliveTime設定為0,佇列用的是LinkedBlockingQueue無界佇列。這種執行緒池適用於流量比較穩定的情況,不會說一段時間突然有大量的流量湧入,導致LinkedBlockingQueue越來越大最後導致記憶體溢位。
- CachedThreadPool:按需求建立執行緒數量執行緒池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到corePoolSize=0,maximumPoolSize是Integer.MAX_VALUE, keepAliveTime為60秒,佇列使用的是SynchronousQueue同步佇列,這個佇列可以理解為沒有容量的阻塞佇列,只有有別的執行緒來拿任務時,當前執行緒任務才能插入成功,反過來也是一樣。所以這種執行緒池任務佇列是不存任務的,任務全靠建立新的執行緒來處理,處理完了過後執行緒空閒超過60秒就會被銷燬。所以這種執行緒池適合有一定高峰流量的場景。但是還是要慎用,如果流量過高,會導致建立的執行緒過多,直接導致服務所在機器的CPU負載過高,然後機器卡死。所以如果使用這種執行緒池一定要流量是開發者知道的,最高峰時候的流量也不會導致CPU負載過高,才能使用這種執行緒池。
- 任務排程執行緒池:ScheduledThreadPoolExecutor。
可以根據自己的需求,使用單執行緒排程(SingleThreadScheduledExecutor),多執行緒排程(ScheduledThreadPool)。不過現在使用spring排程比較多點,我自己在開發中使用執行緒池的排程也比較少了,基本會使用spring的排程。
- 自定義執行緒池(推薦使用)
個人比較推薦這種方式,根據實際的一個業務場景,自己new一個ThreadPoolExecutor,引數根據業務場景需要指定合適的引數,比如核心執行緒數設定多少合適,最大執行緒數設定多少合適,任務佇列設定多大的有界合適,拒絕策略也可以自定義,一般採用離線儲存啥的,完全根絕自己的業務場景來定製。這樣可以保證不會發生無界佇列導致oom,也不會導致建立的執行緒過多而導致機器卡死(一般I5、4核的處理器跑1000左右的執行緒就會負載過高)。
4. 執行緒池關閉
- shutdown():優雅關閉。呼叫之後不允許提交新的任務了,所有呼叫之前提交的任務都會執行,等所有任務執行完了,才會真正關閉執行緒池,這就是優雅的關閉方式。
- shutdownNow():強制關閉。返回還沒執行的task列表,然後不讓等待的task執行,嘗試停止正在執行的task,非優雅關閉,強制關閉。
四、 執行緒池在使用過程中存在的一些問題以及解決方案
- 在一些存在流量高峰,一段時間內併發量很大,引數設定不當可能導致效能不佳,CPU負載過高,記憶體溢位,拒絕策略設定不當導致任務丟失或者執行失敗等等問題。
這些問題開發者可以使用上面提高的自定義建立執行緒池自行根據業務場景來設定執行緒池的引數,從而規避上述的一些問題。
- 服務重啟導致記憶體的任務佇列中的任務全部丟失。
這種情況,如果業務場景是需要保證訊息的百分百不丟失,那就需要在提交任務時,對任務做離線儲存,在任務執行完過後,再將對應的離線儲存的任務刪除。服務啟動後,需要起一個後臺執行緒去載入一次離線儲存的任務,提交給執行緒池去執行。