java執行緒池詳解及五種執行緒池方法詳解
基礎知識
Executors建立執行緒池
Java中建立執行緒池很簡單,只需要呼叫Executors
中相應的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads)
,但是便捷不僅隱藏了複雜性,也為我們埋下了潛在的隱患(OOM,執行緒耗盡)。
Executors
建立執行緒池便捷方法列表:
方法名 | 功能 |
---|---|
newFixedThreadPool(int nThreads) | 建立固定大小的執行緒池 |
newSingleThreadExecutor() | 建立只有一個執行緒的執行緒池 |
newCachedThreadPool() | 建立一個不限執行緒數上限的執行緒池,任何提交的任務都將立即執行 |
小程式使用這些快捷方法沒什麼問題,對於服務端需要長期執行的程式,建立執行緒池應該直接使用ThreadPoolExecutor
的構造方法。沒錯,上述Executors
方法建立的執行緒池就是ThreadPoolExecutor
。
ThreadPoolExecutor構造方法
Executors
中建立執行緒池的快捷方法,實際上是呼叫了ThreadPoolExecutor
的構造方法(定時任務使用的是ScheduledThreadPoolExecutor
),該類構造方法引數列表如下:
- // Java執行緒池的完整建構函式
-
public ThreadPoolExecutor(
- int corePoolSize, // 執行緒池長期維持的執行緒數,即使執行緒處於Idle狀態,也不會回收。
- int maximumPoolSize, // 執行緒數的上限
- long keepAliveTime, TimeUnit unit, // 超過corePoolSize的執行緒的idle時長,
- // 超過這個時間,多餘的執行緒會被回收。
- BlockingQueue<Runnable> workQueue, // 任務的排隊佇列
- ThreadFactory threadFactory, // 新執行緒的產生方式
-
RejectedExecutionHandler handler) // 拒絕策略
竟然有7個引數,很無奈,構造一個執行緒池確實需要這麼多引數。這些引數中,比較容易引起問題的有corePoolSize
, maximumPoolSize
, workQueue
以及handler
:
-
corePoolSize
和maximumPoolSize
設定不當會影響效率,甚至耗盡執行緒; -
workQueue
設定不當容易導致OOM; -
handler
設定不當會導致提交任務時丟擲異常。
正確的引數設定方式會在下文給出。
執行緒池的工作順序
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
corePoolSize -> 任務佇列 -> maximumPoolSize -> 拒絕策略
Runnable和Callable
可以向執行緒池提交的任務有兩種:Runnable
和Callable
,二者的區別如下:
- 方法簽名不同,
void Runnable.run()
,V Callable.call() throws Exception
- 是否允許有返回值,
Callable
允許有返回值 - 是否允許丟擲異常,
Callable
允許丟擲異常。
Callable
是JDK1.5時加入的介面,作為Runnable
的一種補充,允許有返回值,允許丟擲異常。
三種提交任務的方式:
提交方式 | 是否關心返回結果 |
---|---|
Future<T> submit(Callable<T> task) |
是 |
void execute(Runnable command) |
否 |
Future<?> submit(Runnable task) |
否,雖然返回Future,但是其get()方法總是返回null |
如何正確使用執行緒池
避免使用無界佇列
不要使用Executors.newXXXThreadPool()
快捷方法建立執行緒池,因為這種方式會使用無界的任務佇列,為避免OOM,我們應該使用ThreadPoolExecutor
的構造方法手動指定佇列的最大長度:
- ExecutorService executorService = new ThreadPoolExecutor(2, 2,
- 0, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(512), // 使用有界佇列,避免OOM
- new ThreadPoolExecutor.DiscardPolicy());
明確拒絕任務時的行為
任務佇列總有佔滿的時候,這是再submit()
提交新的任務會怎麼樣呢?RejectedExecutionHandler
介面為我們提供了控制方式,介面定義如下:
- public interface RejectedExecutionHandler {
- void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
- }
執行緒池給我們提供了幾種常見的拒絕策略:
拒絕策略 | 拒絕行為 |
---|---|
AbortPolicy | 丟擲RejectedExecutionException |
DiscardPolicy | 什麼也不做,直接忽略 |
DiscardOldestPolicy | 丟棄執行佇列中最老的任務,嘗試為當前提交的任務騰出位置 |
CallerRunsPolicy | 直接由提交任務者執行這個任務 |
執行緒池預設的拒絕行為是AbortPolicy
,也就是丟擲RejectedExecutionHandler
異常,該異常是非受檢異常,很容易忘記捕獲。如果不關心任務被拒絕的事件,可以將拒絕策略設定成DiscardPolicy
,這樣多餘的任務會悄悄的被忽略。
- ExecutorService executorService = new ThreadPoolExecutor(2, 2,
- 0, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(512),
- new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略
獲取處理結果和異常
執行緒池的處理結果、以及處理過程中的異常都被包裝到Future
中,並在呼叫Future.get()
方法時獲取,執行過程中的異常會被包裝成ExecutionException
,submit()
方法本身不會傳遞結果和任務執行過程中的異常。獲取執行結果的程式碼可以這樣寫:
- ExecutorService executorService = Executors.newFixedThreadPool(4);
- Future<Object> future = executorService.submit(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- throw new RuntimeException("exception in call~");// 該異常會在呼叫Future.get()時傳遞給呼叫者
- }
- });
- try {
- Object result = future.get();
- } catch (InterruptedException e) {
- // interrupt
- } catch (ExecutionException e) {
- // exception in Callable.call()
- e.printStackTrace();
- }
上述程式碼輸出類似如下:
執行緒池的常用場景
正確構造執行緒池
- int poolSize = Runtime.getRuntime().availableProcessors() * 2;
- BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
- RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
- executorService = new ThreadPoolExecutor(poolSize, poolSize,
- 0, TimeUnit.SECONDS,
- queue,
- policy);
獲取單個結果
過submit()
向執行緒池提交任務後會返回一個Future
,呼叫V Future.get()
方法能夠阻塞等待執行結果,V get(long timeout, TimeUnit unit)
方法可以指定等待的超時時間。
獲取多個結果
如果向執行緒池提交了多個任務,要獲取這些任務的執行結果,可以依次呼叫Future.get()
獲得。但對於這種場景,我們更應該使用ExecutorCompletionService,該類的take()
方法總是阻塞等待某一個任務完成,然後返回該任務的Future
物件。向CompletionService
批量提交任務後,只需呼叫相同次數的CompletionService.take()
方法,就能獲取所有任務的執行結果,獲取順序是任意的,取決於任務的完成順序:
- void solve(Executor executor, Collection<Callable<Result>> solvers)
- throws InterruptedException, ExecutionException {
- CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 構造器
- for (Callable<Result> s : solvers)// 提交所有任務
- ecs.submit(s);
- int n = solvers.size();
- for (int i = 0; i < n; ++i) {// 獲取每一個完成的任務
- Result r = ecs.take().get();
- if (r != null)
- use(r);
- }
- }
單個任務的超時時間
V Future.get(long timeout, TimeUnit unit)
方法可以指定等待的超時時間,超時未完成會丟擲TimeoutException
。
多個任務的超時時間
等待多個任務完成,並設定最大等待時間,可以通過CountDownLatch完成:
- public void testLatch(ExecutorService executorService, List<Runnable> tasks)
- throws InterruptedException{
- CountDownLatch latch = new CountDownLatch(tasks.size());
- for(Runnable r : tasks){
- executorService.submit(new Runnable() {
- public void run() {
- try{
- r.run();
- }finally {
- latch.countDown();// countDown
- }
- }
- });
- }
- latch.await(10, TimeUnit.SECONDS); // 指定超時時間
- }
執行緒池和裝修公司
以運營一家裝修公司做個比喻。公司在辦公地點等待客戶來提交裝修請求;公司有固定數量的正式工以維持運轉;旺季業務較多時,新來的客戶請求會被排期,比如接單後告訴使用者一個月後才能開始裝修;當排期太多時,為避免使用者等太久,公司會通過某些渠道(比如人才市場、熟人介紹等)僱傭一些臨時工(注意,招聘臨時工是在排期排滿之後);如果臨時工也忙不過來,公司將決定不再接收新的客戶,直接拒單。
執行緒池就是程式中的“裝修公司”,代勞各種髒活累活。上面的過程對應到執行緒池上:
- // Java執行緒池的完整建構函式
- public ThreadPoolExecutor(
- int corePoolSize, // 正式工數量
- int maximumPoolSize, // 工人數量上限,包括正式工和臨時工
- long keepAliveTime, TimeUnit unit, // 臨時工遊手好閒的最長時間,超過這個時間將被解僱
- BlockingQueue<Runnable> workQueue, // 排期佇列
- ThreadFactory threadFactory, // 招人渠道
- RejectedExecutionHandler handler) // 拒單方式
總結
Executors
為我們提供了構造執行緒池的便捷方法,對於伺服器程式我們應該杜絕使用這些便捷方法,而是直接使用執行緒池ThreadPoolExecutor
的構造方法,避免無界佇列可能導致的OOM以及執行緒個數限制不當導致的執行緒數耗盡等問題。ExecutorCompletionService
提供了等待所有任務執行結束的有效方式,如果要設定等待的超時時間,則可以通過CountDownLatch
完成。
Java 五種執行緒池詳解
在應用開發中,通常有這樣的需求,就是併發下載檔案操作,比如百度網盤下載檔案、騰訊視訊下載視訊等,都可以同時下載好幾個檔案,這就是併發下載。併發下載處理肯定是多執行緒操作,而大量的建立執行緒,勢必會影響程式的效能,導致卡頓等問題。所以呢,Java 中給我們提供了執行緒池來管理執行緒。
首先,我們來看看執行緒池是什麼?顧名思義,好比一個存放執行緒的池子,我們可以聯想水池。執行緒池意味著可以儲存執行緒,並讓池內的執行緒得以複用,如果池內的某一個執行緒執行完了,並不會直接摧毀,它有生命,可以存活一些時間,待到下一個任務來時,它會複用這個在等待中執行緒,避免了再去建立執行緒的額外開銷。
百度對執行緒池的簡介:
【執行緒池(英語:thread pool):一種執行緒使用模式。執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。 例如,執行緒數一般取cpu數量+2比較合適,執行緒數過多會導致額外的執行緒切換開銷。】
執行緒池的概念與作用就介紹完了,下面就是執行緒池的運用了,我們來看這樣的一個例子,模擬網路下載的功能,開啟多工下載操作,其中每條下載都開闢新執行緒來執行。
效果圖:
可以看到就是這樣效果,這裡的每次點選 下載 按鈕,都會開啟一個子執行緒來更新進度條操作。注意了:這裡我們看到的 name 就是執行緒的名字。可以觀察到,5個下載任務所用的執行緒都是不同的,所以它們的執行緒名都不一樣。
也就是說,我們每個任務開闢的都是一個新的執行緒,假如我們下載任務量非常龐大時,那開闢的執行緒將不可控制,先不說效能問題,如果出現了執行緒安全問題或者是執行緒的排程,處理起來都是非常困難的。所以這種情況下,非常的有必要引入我們的執行緒池來管理這些執行緒,剛剛我們介紹了執行緒池的優點,現在讓我們具體的實現一下,才能體會它到底有那些優勢。
首先,我們的執行緒池型別一共有 4 種,分別是 newSingleThreadPool、newFixedThreadPool、newCachedThreadPool、newScheduledThreadPool 四種,這是在 JDK1.8 版本以前了,在 JDK1.8 版本又加入了一種:newWorkStealingPool,所以現在一共是 5 種。
1、執行緒池的建立過程
通過這幾種執行緒池的命名,我們大致可以猜測出來它的用意,當然,還是必須要實踐一下。對 執行緒池 的建立一般都是這樣的步驟:
-
//建立單核心的執行緒池
-
ExecutorService executorService = Executors.newSingleThreadExecutor();
-
//建立固定核心數的執行緒池,這裡核心數 = 2
-
ExecutorService executorService = Executors.newFixedThreadPool(2);
-
//建立一個按照計劃規定執行的執行緒池,這裡核心數 = 2
-
ExecutorService executorService = Executors.newScheduledThreadPool(2);
-
//建立一個自動增長的執行緒池
-
ExecutorService executorService = Executors.newCachedThreadPool();
-
//建立一個具有搶佔式操作的執行緒池
-
ExecutorService executorService = Executors.newWorkStealingPool();
我們只需要這樣呼叫就可成功的建立適用於我們的執行緒池,不過從上面看不出上面東西來,我們要進入執行緒池建立的構造器,程式碼如下:
-
/**
-
* Creates a new {@code ThreadPoolExecutor} with the given initial
-
* parameters and default thread factory and rejected execution handler.
-
* It may be more convenient to use one of the {@link Executors} factory
-
* methods instead of this general purpose constructor.
-
*
-
* @param corePoolSize the number of threads to keep in the pool, even
-
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
-
* @param maximumPoolSize the maximum number of threads to allow in the
-
* pool
-
* @param keepAliveTime when the number of threads is greater than
-
* the core, this is the maximum time that excess idle threads
-
* will wait for new tasks before terminating.
-
* @param unit the time unit for the {@code keepAliveTime} argument
-
* @param workQueue the queue to use for holding tasks before they are
-
* executed. This queue will hold only the {@code Runnable}
-
* tasks submitted by the {@code execute} method.
-
* @throws IllegalArgumentException if one of the following holds:<br>
-
* {@code corePoolSize < 0}<br>
-
* {@code keepAliveTime < 0}<br>
-
* {@code maximumPoolSize <= 0}<br>
-
* {@code maximumPoolSize < corePoolSize}
-
* @throws NullPointerException if {@code workQueue} is null
-
*/
-
public ThreadPoolExecutor(int corePoolSize,
-
int maximumPoolSize,
-
long keepAliveTime,
-
TimeUnit unit,
-
BlockingQueue<Runnable> workQueue) {
-
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
-
Executors.defaultThreadFactory(), defaultHandler);
-
}
當然,上面的註釋都對引數進行了介紹,我們用自己的語言進行歸納一下:
- corePoolSize : 表示執行緒池核心執行緒數,當初始化執行緒池時,會建立核心執行緒進入等待狀態,即使它是空閒的,核心執行緒也不會被摧毀,從而降低了任務一來時要建立新執行緒的時間和效能開銷。
- maximumPoolSize : 表示最大執行緒數,意味著核心執行緒數都被用完了,那隻能重新建立新的執行緒來執行任務,但是前提是不能超過最大執行緒數量,否則該任務只能進入阻塞佇列進行排隊等候,直到有執行緒空閒了,才能繼續執行任務。
- keepAliveTime : 表示執行緒存活時間,除了核心執行緒外,那些被新創建出來的執行緒可以存活多久。意味著,這些新的執行緒一但完成任務,而後面都是空閒狀態時,就會在一定時間後被摧毀。
- unit : 存活時間單位,沒什麼好解釋的,一看就懂。
- workQueue : 表示任務的阻塞佇列,由於任務可能會有很多,而執行緒就那麼幾個,所以那麼還未被執行的任務就進入佇列中排隊,佇列我們知道是 FIFO 的,等到執行緒空閒了,就以這種方式取出任務。這個一般不需要我們去實現。
還有一個注意點就是它這裡的規定,可能會丟擲這樣的異常情況。這下面寫的很明白了,就不要再介紹了:
-
* @throws IllegalArgumentException if one of the following holds:<br>
-
* {@code corePoolSize < 0 }
-
* {@code keepAliveTime < 0 }
-
* {@code maximumPoolSize <= 0 }
-
* {@code maximumPoolSize < corePoolSize }
-
* @throws NullPointerException if {@code workQueue} is null
好了,以上重點幾個引數內容我們介紹完了,現在來看看幾種執行緒池的比較和表現吧!
2、執行緒池的比較
(1)newSingleThreadPool,為單核心執行緒池,最大執行緒也只有一個,這裡的時間為 0 意味著無限的生命,就不會被摧毀了。它的建立方式原始碼如下:
-
public static ExecutorService newSingleThreadExecutor() {
-
return new FinalizableDelegatedExecutorService
-
(new ThreadPoolExecutor(1, 1,
-
0L, TimeUnit.MILLISECONDS,
-
new LinkedBlockingQueue<Runnable>()));
-
}
最形象的就是拿我們下載那個例子,為了便於測試,我當然添加了一個 全部下載的功能, newSingleThreadPool 測試結果如下:
由於我們的執行緒池中使用的從始至終都是單個執行緒,所以這裡的執行緒名字都是相同的,而且下載任務都是一個一個的來,直到有空閒執行緒時,才會繼續執行任務,否則都是等待狀態。
(2)newFixedThreadPool,我們需要傳入一個固定的核心執行緒數,並且核心執行緒數等於最大執行緒數,而且它們的執行緒數存活時間都是無限的,看它的建立方式:
-
public static ExecutorService newFixedThreadPool(int nThreads) {
-
return new ThreadPoolExecutor(nThreads, nThreads,
-
0L, TimeUnit.MILLISECONDS,
-
new LinkedBlockingQueue<Runnable>());
-
}
對比 newSingleThreadPool,其實改變的也就是可以根據我們來自定義執行緒數的操作,比較相似。我們通過newFixedThreadPool(2)給它傳入了 2 個核心執行緒數,看看下載效果如何:
顯然,它就可以做到併發的下載,我們兩個下載任務可以同時進行,並且所用的執行緒始終都只有兩個,因為它的最大執行緒數等於核心執行緒數,不會再去建立新的執行緒了,所以這個方式也可以,但最好還是運用下面一種執行緒池。
(3)newCachedThreadPool,可以進行快取的執行緒池,意味著它的執行緒數是最大的,無限的。但是核心執行緒數為 0,這沒關係。這裡要考慮執行緒的摧毀,因為不能夠無限的建立新的執行緒,所以在一定時間內要摧毀空閒的執行緒。看看建立的原始碼:
-
public static ExecutorService newCachedThreadPool() {
-
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
-
60L, TimeUnit.SECONDS,
-
new SynchronousQueue<Runnable>());
-
}
沒有核心執行緒數,但是我們的最大執行緒數沒有限制,所以一點全部開始下載,就會創建出 5 條新的執行緒同時執行任務,從上圖的例子看出,每天執行緒都不一樣。看不出這個執行緒池的效果,下面我們通過修改這個邏輯。
首先,我們點開始下載,只會下載前面三個,為了證明執行緒的複用效果,我這裡又添加了一個按鈕,在這個按鈕中繼續新增後面兩個下載任務。
那麼,當執行緒下載完畢時,空閒執行緒就會複用,結果顯示如下,複用執行緒池的空閒執行緒:
另一種情況,當執行緒池中沒有空閒執行緒時,這時又加了新的任務,它就會創建出新的執行緒來執行任務,結果如下:
這下算是搞清楚這種執行緒池的作用了吧,但是由於這種執行緒池建立時初始化的都是無界的值,一個是最大執行緒數,一個是任務的阻塞佇列,都沒有設定它的界限,這可能會出現問題。
這裡可以參考我的一篇文章: AsyncTask 原始碼 分析,或者這個 單利模式 解讀的文章,裡面有提到如何建立自定義的執行緒池,參考的是 AsyncTask 的原始碼執行緒池建立程式碼。
(4)newScheduledThreadPool,這個表示的是有計劃性的執行緒池,就是在給定的延遲之後執行,或週期性地執行。很好理解,大家應該用過 Timer 定時器類吧,這兩個差不多的意思。它的建構函式如下:
-
public ScheduledThreadPoolExecutor(int corePoolSize) {
-
super(corePoolSize, Integer.MAX_VALUE,
-
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
-
new DelayedWorkQueue());
-
}
內部有一個延時的阻塞佇列來維護任務的進行,延時也就是在這裡進行的。我們把建立 newScheduledThreadPool 的程式碼放出來,這樣對比效果圖的話,顯得更加直觀。
-
//引數2:延時的時長
-
scheduledExecutorService.schedule(th_all_1, 3000, TimeUnit.MILLISECONDS);
-
scheduledExecutorService.schedule(th_all_2, 2000, TimeUnit.MILLISECONDS);
-
scheduledExecutorService.schedule(th_all_3, 1000, TimeUnit.MILLISECONDS);
-
scheduledExecutorService.schedule(th_all_4, 1500, TimeUnit.MILLISECONDS);
-
scheduledExecutorService.schedule(th_all_5, 500, TimeUnit.MILLISECONDS);
這個執行緒池好像不是很常用,做個瞭解就好了。
(5)newWorkStealingPool,這個是 JDK1.8 版本加入的一種執行緒池,stealing 翻譯為搶斷、竊取的意思,它實現的一個執行緒池和上面4種都不一樣,用的是 ForkJoinPool 類,建構函式程式碼如下:
-
/**
-
* Creates a thread pool that maintains enough threads to support
-
* the given parallelism level, and may use multiple queues to
-
* reduce contention. The parallelism level corresponds to the
-
* maximum number of threads actively engaged in, or available to
-
* engage in, task processing. The actual number of threads may
-
* grow and shrink dynamically. A work-stealing pool makes no
-
* guarantees about the order in which submitted tasks are
-
* executed.
-
*
-
* @param parallelism the targeted parallelism level
-
* @return the newly created thread pool
-
* @throws IllegalArgumentException if {@code parallelism <= 0}
-
* @since 1.8
-
*/
-
public static ExecutorService newWorkStealingPool(int parallelism) {
-
return new ForkJoinPool
-
(parallelism,
-
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
-
null, true);
-
}
從上面程式碼的介紹,最明顯的用意就是它是一個並行的執行緒池,引數中傳入的是一個執行緒併發的數量,這裡和之前就有很明顯的區別,前面4種執行緒池都有核心執行緒數、最大執行緒數等等,而這就使用了一個併發執行緒數解決問題。從介紹中,還說明這個執行緒池不會保證任務的順序執行,也就是 WorkStealing 的意思,搶佔式的工作。
如下圖,任務的執行是無序的,哪個執行緒搶到任務,就由它執行:
對比了以上 5 種執行緒池,我們看到每個執行緒池都有自己的特點,這也是為我們封裝好的一些比較常用的執行緒池。當然,我建議你在使用(3)可快取的執行緒池時,儘量的不要用預設的那個來建立,因為預設值都是無界的,可能會出現一些問題,這時我們可以參考原始碼中的執行緒池初始化引數的設定,可以儘可能的避免錯誤發生。
通過這個案例,我們把執行緒池學習了一遍,總結一下執行緒池在哪些地方用到,比如網路請求、下載、I/O操作等多執行緒場景,我們可以引入執行緒池,一個對效能有提升,另一個就是可以讓管理執行緒變得更簡單。