Java多執行緒總結(6)— 執行緒池的基本使用和執行流程分析
1 執行緒池的實現原理及基本類結構
合理利用執行緒池能夠帶來三個好處。
- 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
- 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。
Executor執行緒池框架的最大優點是把任務的提交和執行解耦。客戶端將要執行的任務封裝成Task,然後提交即可。而Task如何執行客戶端則是透明的。具體點講,提交一個Callable物件給ExecutorService(如最常用的執行緒池ThreadPoolExecutor),將得到一個Future物件,呼叫Future物件的get方法等待執行結果。
下圖是執行緒池所涉及到的所有類的結構圖(右鍵檢視大圖),先從整體把握下:
圖1 執行緒池實現原理類結構圖
上面這個圖是很複雜的,涉及到了執行緒池內部實現原理的所有類,不利於我們理解執行緒池如何使用。我們先從客戶端的角度出發,看看客戶端使用執行緒池所涉及到的類結構圖:
圖2 執行緒池使用的基本類結構圖
從圖一可知,實際的執行緒池類是實現ExecutorService介面的類,有ThreadPoolExecutor、ForkJoinPool和ScheduledThreadPoolExecutor。下面以常用的ThreadPoolExecutor為例講解。
2 執行緒池實現步驟
2.1 執行緒池的建立
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
引數說明:
- corePoolSize(執行緒池的基本執行緒數): the number of threads to keep in the pool, even if they are idle, unless
allowCoreThreadTimeOut
is set.當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads
方法,執行緒池會提前建立並啟動所有基本執行緒。 - maximumPoolSize(執行緒池最大執行緒數): the maximum number of threads to allow in the pool.執行緒池允許建立的最大執行緒數。如果任務佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。
- keepAliveTime(執行緒活動保持時間):when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
- TimeUnit(執行緒活動保持時間的單位):the time unit for the keepAliveTime argument. 可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
- workQueue(任務佇列):the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.用於儲存等待執行的任務的阻塞佇列。 可以選擇以下幾個阻塞佇列。
ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。 - ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。
- RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略。
AbortPolicy:直接丟擲異常。
CallerRunsPolicy:只用呼叫者所線上程來執行任務。
DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
DiscardPolicy:不處理,丟棄掉。
當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
由此可見,建立一個執行緒所需的引數很多,執行緒池為我們提供了類Executors的靜態工廠方法以建立不同型別的執行緒池。
- newFixedThreadPool
可以生成固定大小的執行緒池;
- newCachedThreadPool
可以生成一個無界、可以自動回收的執行緒池;
- newSingleThreadScheduledExecutor
可以生成一個單個執行緒的執行緒池;
- newScheduledThreadPool
還可以生成支援週期任務的執行緒池。
2.2 向執行緒池提交任務
有兩種方式提交任務:
1.使用void execute(Runnable command)
方法提交任務
execute方法返回型別為void,所以沒有辦法判斷任務是否被執行緒池執行成功。
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Task is running by " + Thread.currentThread().getName());
System.out.println("執行緒池正在執行的執行緒數:" + threadPoolExecutor.getActiveCount());
}
};
threadPool.execute(task);
2.使用submit
方法提交任務
Future<?> submit(Runnable task);
、<T> Future<T> submit(Runnable task, T result);
和 Future<T> submit(Callable<T> task);
會返回一個Future,可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。
Future<?> future = threadPool.submit(task);
try {
Object result = future.get();
System.out.println("任務是否完成:" + future.isDone());
System.out.println("返回的結果為:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// 關閉執行緒池
threadPool.shutdown();
}
2.3 執行緒池關閉
shutdown()
方法:這個方法會平滑地關閉ExecutorService,當我們呼叫這個方法時,ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉ExecutorService。awaitTermination(long timeout, TimeUnit unit)
方法:這個方法有兩個引數,一個是timeout即超時時間,另一個是unit即時間單位。這個方法會使當前關閉執行緒池的執行緒等待timeout時長,當超過timeout時間後,則去監測ExecutorService是否已經關閉,若關閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。shutdownNow()
方法:這個方法會強制關閉ExecutorService,它將取消所有執行中的任務和在工作佇列中等待的任務,這個方法返回一個List列表,列表中返回的是等待在工作佇列中的任務。
// 4. 關閉執行緒池
threadPool.shutdown();
// hreadPool.shutdownNow();
System.out.println("執行緒池是否關閉:" + threadPool.isShutdown());
try {
//當前執行緒阻塞10ms後,去檢測執行緒池是否終止,終止則返回true
while(!threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
System.out.println("檢測執行緒池是否終止:" + threadPool.isTerminated());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒池是否終止:" + threadPool.isTerminated());
完整案例:
package com.markliu.concurrent.threadpool;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo1 {
public static void main(String[] args) {
/*
* 1. 建立執行緒池
*
* 建立一個固定執行緒數目的執行緒池。corePoolSize = maximumPoolSize = 5
* 即執行緒池的基本執行緒數和最大執行緒數相等。
* 相當於:
* new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
*/
ExecutorService threadPool = Executors.newFixedThreadPool(5);
/*
* 2. 封裝任務並提交給執行緒池
*/
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) threadPool;
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("Task is running by " + Thread.currentThread().getName());
System.out.println("執行緒池正在執行的執行緒數:" + threadPoolExecutor.getActiveCount());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/*
* Starts all core threads, causing them to idly wait for work.
* This overrides the default policy of starting core threads
* only when new tasks are executed.
*/
int count = threadPoolExecutor.prestartAllCoreThreads();
System.out.println("開啟的所有core執行緒數:" + count);
System.out.println("執行緒池當前執行緒數:" + threadPoolExecutor.getPoolSize());
System.out.println("執行緒池的core number of threads:" + threadPoolExecutor.getCorePoolSize());
System.out.println("執行緒池中的最大執行緒數:" + threadPoolExecutor.getLargestPoolSize());
// 3. 執行,獲取返回結果
/**
* execute方式提交任務
*/
// threadPool.execute(task);
/**
* submit方式提交任務
*/
Future<?> future = threadPool.submit(task);
try {
// 阻塞,等待執行緒執行完成,並獲得結果
Object result = future.get();
System.out.println("任務是否完成:" + future.isDone());
System.out.println("返回的結果為:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
System.out.println("執行緒池中已經執行完的任務數:" + threadPoolExecutor.getCompletedTaskCount());
// 4. 關閉執行緒池
/*
* shutdown方法平滑地關閉執行緒池,將執行緒池的狀態設為:SHUTDOWN
* 停止接受任何新的任務且等待已經提交的任務執行完成,當所有已經
* 提交的任務執行完畢後將會關閉執行緒池
*/
threadPool.shutdown();
/*
* shutdownNow方法強制關閉執行緒池,將執行緒池狀態設定為:STOP
* 取消所有執行中的任務和在工作佇列中等待的任務,並返回所有未執行的任務List
*/
// hreadPool.shutdownNow();
System.out.println("執行緒池是否關閉:" + threadPool.isShutdown());
try {
//當前執行緒阻塞10ms後,去檢測執行緒池是否終止,終止則返回true
while(!threadPool.awaitTermination(10, TimeUnit.MILLISECONDS)) {
System.out.println("檢測執行緒池是否終止:" + threadPool.isTerminated());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒池是否終止:" + threadPool.isTerminated());
}
}
}
3 執行緒池的執行流程分析
執行緒池的主要工作流程如下圖:
當提交一個新任務到執行緒池時,執行緒池的處理流程如下:
- 首先執行緒池判斷“基本執行緒池”(corePoolSize)是否已滿?沒滿,建立一個工作執行緒來執行任務。滿了,則進入下個流程。
- 其次執行緒池判斷工作佇列(workQueue)是否已滿?沒滿,則將新提交的任務儲存在工作佇列裡。滿了,則進入下個流程。
- 最後執行緒池判斷整個執行緒池的執行緒數是否已超過maximumPoolSize?沒滿,則建立一個新的工作執行緒來執行任務,滿了,則交給拒絕策略來處理這個任務。
(我的理解:提交任務—>如果執行緒數未達到corePoolSize,則建立執行緒執行任務—>如果達到corePoolSize,仍讓提交了任務,則會有任務等待,所以將任務儲存在任務佇列中,直到任務佇列workQueue已滿—>如果workQueue已滿,仍然有任務提交,但未達到最大執行緒數,則繼續建立執行緒執行任務,直到執行緒數達到maximumPoolSize,如果達到了maximumPoolSize,則根據飽和策略拒絕該任務。這也就解釋了為什麼有了corePoolSize還有maximumPoolSize的原因。)
關於執行緒池的工作原理後期從原始碼分析。
4 執行緒池的監控
通過執行緒池提供的引數進行監控:
- taskCount:執行緒池需要執行的任務數量。
- completedTaskCount:執行緒池在執行過程中已完成的任務數量。小於或等於taskCount。
- largestPoolSize:執行緒池曾經建立過的最大執行緒數量。通過這個資料可以知道執行緒池是否滿過。如等於執行緒池的最大大小,則表示執行緒池曾經滿了。
- getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,池裡的執行緒不會自動銷燬,所以這個大小隻增不減
- getActiveCount:獲取活動的執行緒數。
通過繼承執行緒池並重寫執行緒池的beforeExecute,afterExecute和terminated方法,可以在任務執行前,執行後和執行緒池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。
/**
* 給定的Thread執行Runnable之前呼叫此方法
*
* @param t the thread that will run task {@code r}
* @param r the task that will be executed
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* 給定的Runnable完成後執行此方法
* This method is invoked by the thread that executed the task.
* @param r the runnable that has completed
* @param t the exception that caused termination, or null if
* execution completed normally
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* 當Executor終止時呼叫此方法.
* 注意:方法中子類應呼叫super.terminated()
*/
protected void terminated() { }
例如:
class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}