java併發程式設計之執行緒池
前言
本文介紹幾種java常用的執行緒池如:FixedThreadPool,ScheduledThreadPool,CachedThreadPool等執行緒池,並分析介紹Executor框架,做到“知其然”:會用執行緒池,正確使用執行緒池。並且“知其所以然”:瞭解背後的基本原理。
1.Executor
Executor是java SE5的java.util.concurrent包下的執行器,用於管理Thread物件,它幫程式設計師簡化了併發程式設計。與客戶端直接執行任務不同,Executor作為客戶端和任務執行之間的中介,將任務的提交和任務的執行策略解耦開來,Executor允許我們管理非同步任務的執行。
Executor的實現思想基於生產者-消費者模型,提交任務的執行緒相當於生產者,執行任務的工作執行緒相當於消費者,這裡所說的任務即我們實現Runnable介面的類。
Executor關鍵類圖如下:
根據這個類圖,詳細分析一下其中的門路:
- Executor:該介面定義了一個 void execute(Runnable command)方法,用來接受任務。
- ExecutorService:繼承了Executor並對其進行了豐富的拓展,提供了任務生命週期管理相關的方法,如shutdown(),shutdownNow()等方法,並提供了追蹤一個或者多個非同步任務執行狀況並返回Future的方法,如submit(),invokeAll()方法
- AbstractExecutorService:ExecutorService的預設實現類,該類是一個抽象類。
- ThreadPoolExecutor:繼承了AbstractExecutorService並實現了其方法,可以通過Executors提供的靜態工廠方法建立執行緒池並返回ExecutorService例項
- ScheduledExecutorService:提供定時排程任務的介面
- ScheduledThreadPoolExecutor:ScheduledExecutorService的實現類,提供可定時排程任務的執行緒池。
2.ThreadPoolExecutor
根據上面的類圖,我們詳細看下ThreadPoolExecutor提供的構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//略 ……
}
引數介紹:
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
引數說明:
- corePoolSize:核心執行緒數/基本執行緒數,沒有任務執行時執行緒池的大小(在建立ThreadPoolExecutor時,執行緒不會立即啟動,有任務提交時才會啟動)
- maximumPoolSize:最大執行緒數,允許的建立的執行緒數
- keepAliveTime:執行緒存活時間,當某個執行緒的空閒時間超過了存活時間,將被標記為可回收的,如果此時執行緒池的大小超過了corePoolSize,執行緒將被終止。
- TimeUnit:keepAliveTime的單位,可選毫秒,秒等
- workQueue:儲存任務的阻塞佇列,主要有三種:有界佇列,無界佇列和同步移交佇列(Synchronous Handoff)。下面將詳細說明
- threadFactory(可選):在Executor建立新執行緒的時候使用,預設使用defaultThreadFactory建立執行緒
- handler:定義“飽和策略”,這裡的飽和根據引數說明是指執行緒池容量(workQueue也滿了)滿了的時候,會使用飽和策略進行任務的拒絕。預設的策略是“Abort”即中止策略,該策略丟擲RejectExecutorException異常,其他的策略有“拋棄(Discard)、拋棄最舊(Discard-Oldest)”等。
很多人有疑問這些引數的含義到底是什麼,網上的解釋也五花八門,這裡我通俗的解釋一下,當提交一個任務的時候,會先檢查當前執行的執行緒數,如果當前執行的執行緒數小於基本執行緒數(corePoolSize),則直接建立執行緒執行任務,且這個執行緒屬於core執行緒。如果當前執行的執行緒數大於等於基本執行緒數,該任務會被放到阻塞佇列(workQueue)中,在阻塞佇列裡的任務會複用core執行緒,即阻塞佇列裡的任務會等待core執行緒提取執行。而當阻塞佇列是有界佇列時,在阻塞佇列滿了的時候,會建立新的執行緒來執行任務,這些新的執行緒是非core執行緒,滿足keepAliveTime的時候會被銷燬,而已經進入佇列裡的任務會繼續由已有的全部執行緒來執行。超過最大執行緒時(maximumPoolSize),會使用我們定義的“飽和策略”來處理。
這裡面其實牽扯了另一個問題,即如何實現執行緒複用,簡單來說就是執行緒在執行任務時,執行完後會去佇列裡面take新的任務,而take方法是阻塞的,因此執行緒並不會被銷燬,只會不停的執行任務,沒有任務時要麼根據我們的邏輯銷燬要麼阻塞等待任務。
3.Executors
有時候使用ThreadPoolExecutor自己建立執行緒池時由於不清楚如何設定執行緒池大小,存活時間等問題會導致資源浪費,而ThreadPoolExecutor是一個靈活的、穩定的執行緒池,允許各種定製。Executors提供了一系列的靜態工程方法幫我們建立各種執行緒池。
newFixedThreadPool
newFixedThreadPool建立可重用且執行緒數量固定的執行緒池,當執行緒池所有執行緒都在執行任務時,新的任務會在阻塞佇列中等待,當某個執行緒因為異常而結束,執行緒池會建立新的執行緒進行補充。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool使用基於FIFO(先進先出)策略的LinkedBlockingQueue作為阻塞佇列。
newSingleThreadPool
newSingleThreadPool使用單執行緒的Executor,其中corePoolSize和maximumPoolSize都為1,固定執行緒池大小為1。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
newScheduledThreadPool建立可以延遲執行或者定時執行任務的執行緒池,使用延時工作佇列DelayedWorkQueue。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
newCachedThreadPool
newCachedThreadPool是一個可快取的執行緒池,執行緒池中的執行緒如果60s內未被使用將被移除。使用同步佇列SynchronousQueue。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
關於同步佇列SynchronousQueue
回想一下,newSingleThreadPool,newFixedThreadPool預設使用無界佇列LinkedBlockingQueue,當任務執行速度遠遠低於新任務到來的速度時,該佇列將無限增加,如果我們使用有界佇列例如ArrayBlockingQueue,當佇列填滿後則需要完善的飽和策略,這裡需要根據需求選取折中之道。
這裡就引出了SynchronousQueue,對於非當大的或者無界的執行緒池,使用SynchronousQueue可以避免排隊,它可以講任務直接從生產中交給工作者執行緒,SynchronousQueue本質上不是一個佇列,而是一種同步移交機制,要將一個任務放入SynchronousQueue時必須要有一個執行緒在等待接受,如果沒有執行緒在等待且當前執行緒數大於最大值,將啟用飽和策略拒絕任務。
總結:newSingleThreadPool使用單執行緒的Executor,newFixedThreadPool設定執行緒池的基本大小和最大大小為指定的值,而且建立的執行緒池不會超時。newCachedThreadPool將執行緒池的最大大小設定為Integer.MAX_VALUE(即2的31次方減1),基本大小為0,超時時間為1分鐘,該執行緒池可以無限拓展,且需求降低時會自動收縮。newScheduledThreadPool用於執行定時任務的執行緒池。其他形式的執行緒池大家可參考其他資料。
關於執行緒池大小的問題
執行緒池的大小取決於提交的任務的型別和系統的效能。
- 如果執行緒池過大,大量執行緒將在相對有限的cpu和記憶體資源上競爭,將導致記憶體使用量過高,耗盡資源
- 如果執行緒池太小,空想的處理器資源無法有效利用,降低了吞吐率。
為了正確設定執行緒池大小需要考慮系統的CPU數,記憶體容量,任務型別(是計算密集型還是io密集型或是二者皆可),任務是否需要稀缺資源(如jdbc連線)
對於計算密集型任務,在擁有N個處理器的系統上,最佳執行緒池的大小為N+1。這個額外的執行緒保證了突發情況下CPU時鐘週期不會浪費。對於包含IO操作或者其他阻塞操作的任務,由於執行緒不會一直執行,執行緒池需要更大。
執行緒池的大小設定最優公式如下:
引數定義:
N = number of CPUs (cpu數量)
U = target CPU utilization,0<= U <= 1 (cpu利用率)
W/C = ratio of wait time to compute time (任務等待時間和計算時間的比值)
那麼最優執行緒池大小計算公式為:
S = N*U*(1+W/C)
其中cpu數可以通過Runtime.getRuntime().availableProcessors()獲得
CPU週期只是影響執行緒池大小的一個主要引數,其他的因素也很重要,需要綜合實踐。
參考《thinking in java》、《java併發程式設計實戰》