Java併發(基礎知識)—— Executor框架及執行緒池
在Java併發(基礎知識)—— 建立、執行以及停止一個執行緒中講解了兩種建立執行緒的方式:直接繼承Thread類以及實現Runnable介面並賦給Thread,這兩種建立執行緒的方式線上程比較少的時候是沒有問題的,但是當需要建立大量執行緒時就會出現問題,因為這種使用方法把執行緒建立語句隨意地散落在程式碼中,無法統一管理執行緒,我們將無法管理建立執行緒的數量,而過量的執行緒建立將直接使系統崩潰。
從高內聚角度講,我們應該建立一個統一的建立以及執行介面,為我們管理這些執行緒,這個統一的建立與執行介面就是JDK 5的Executor框架。
Executor框架
在Java類庫中,任務執行的主要抽象不是Thread,而是Executor,該介面定義如下:
public interface Executor {
void execute(Runnable command);
}
雖然Executor是一個簡單的介面,但它卻為靈活且強大的非同步任務執行框架提供了基礎,該框架能夠支援多種不同型別的任務執行策略,它提供了一種標準的方法將任務的提交過程與執行過程解耦開來。
Executor基於生產者-消費者模式,提交任務的操作相當於生產者(生成待完成的工作單元),執行任務的執行緒則相當於消費者(執行工作單元)。如果要在一個程式中實現一個生產者-消費者模式,那麼最簡單的方式就是使用Executor。 Executor介面定義了提交任務的方法,但卻沒有定義關閉的方法,ExecutorService介面擴充套件了Executor介面,添加了一些用於生命週期管理的方法:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
...
}
ExecutorService的生命週期有3種狀態:執行、關閉和已終止。ExecutorService在初始建立時處於執行狀態。shutdown方法將執行平緩的關閉過程:不再接受新的任務,同時等待已經提交的任務執行完成——包括那些還未開始執行的任務。shutdownNow方法將執行粗暴的關閉過程:它將嘗試取消所有執行中的任務,並且不再啟動佇列中尚未開始執行的任務。
在所有任務都完成後,ExecutorService將轉入終止狀態。可以呼叫awaitTermination來等待ExecutorService到達終止狀態,或者通過呼叫isTerminated來輪詢ExecutorService是否已經終止。通常在呼叫awaitTermination之後會立即呼叫shutdown,從而產生同步地關閉ExecutorService的效果。
執行緒池
Executor框架的核心是執行緒池。執行緒池是指管理一組同構工作執行緒的資源池,在"執行緒池中執行任務"比"為每個任務分配一個執行緒"優勢更多。通過重用現有的執行緒而不是建立新執行緒,可以在處理多個請求時分攤線上程建立和銷燬過程中產生的巨大開銷。另一個額外的好處是,當請求到達時,工作執行緒通常已經存在,因此不會由於等待建立執行緒而延遲任務的執行,從而提高了響應性。通過適當調整執行緒池大小,可以建立足夠多的執行緒以便使處理器保持忙碌,同時還可以防止過多執行緒相互競爭資源而使應用程式耗盡記憶體而失敗。
ThreadPoolExecutor定義了一個執行緒池,該類的宣告如下:
public class ThreadPoolExecutor extends AbstractExecutorService { … }
public abstract class AbstractExecutorService implements ExecutorService { … }
可以看到,ThreadPoolExecutor繼承自AbstractExecutorService,AbstractExecutorService實現了ExecutorService介面,所以ThreadPoolExecutor也間接實現了ExecutorService介面。
ThreadPoolExecutor定義了很多建構函式,以下程式碼給出了該類最重要的建構函式:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
corePoolSize、maximumPoolSize、keepAliveTime以及unit這幾個引數分別定義了執行緒池的基本大小、最大大小以及存活時間。corePoolSize定義了執行緒池的基本大小,也就是執行緒池的目標大小,即在沒有任務執行時執行緒池的大小,並且只有在工作佇列滿了的情況下才會建立超出這個數量的執行緒。maximumPoolSize定義了執行緒池的最大大小,表示執行緒池可同時活動執行緒數量上限。keepAliveTime和unit共同定義了執行緒的存活時間,如果某個執行緒的空閒時間超過了存活時間,那麼將被標記為可回收的,並且當執行緒池的當前大小超過基本大小時,這個執行緒將被終止。
workQueue引數包含Runnable的阻塞佇列,當執行緒池達到基本大小時,新提交的任務將放入這個阻塞佇列中,阻塞佇列的實現包含三種:無界佇列、有界佇列以及同步移交佇列。
threadFactory引數用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字,方便定位問題。
handler引數定義了執行緒池飽和策略。當有界佇列被填滿後,並且執行緒池活動執行緒達到最大執行緒數,飽和策略開始生效。JDK提供了幾種不同的RejectedExecutionHandler實現,分別是AbortPolicy、DiscardPolicy、DiscardOldestPolicy以及CallerRunsPolicy。AbortPolicy是預設的飽和策略,該策略將丟擲未檢查的RejectedExecutionException。DiscardPolicy策略會把新提交的任務直接拋棄,而DiscardOldestPolicy策略會拋棄佇列首部最老的任務。CallerRunsPolicy策略實現了一種調節機制,該策略既不會拋棄任務,也不會丟擲異常,而是將某些任務回退到呼叫者,從而降低新任務的流量,它不會線上程池中的某個執行緒中執行新提交的任務,而是在一個呼叫了execute的執行緒中執行該任務。
Executors靜態工廠方法
從上一節內容看出,ThreadPoolExecutor的新建需要傳入很多引數,使用起來極不方便。為了便於使用,Executors為我們提供了幾個靜態工廠方法,大大簡化了執行緒池的建立,它們分別是:
newFixedThreadPool:newFixedThreadPool將建立一個固定大小的執行緒池,每當提交一個任務就建立一個執行緒,直到達到執行緒池的最大數量,這時執行緒池的規模將不再變化;
newCachedThreadPool:newCachedThreadPool將建立一個可快取的執行緒池,如果執行緒池的當前規模超過了處理需求,那麼將回收空閒執行緒;而當需求增加時,可以新增新的執行緒,執行緒池的規模不存在任何限制。
newSingleThreadExecutor:newSingleThreadExecutor是一個單執行緒的Executor,它建立單個工作者執行緒執行任務,如果這個執行緒異常結束,會建立另一個執行緒代替。
以newCachedThreadPool為例,我們可以看看這些靜態工廠方法的內部實現:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到,這些靜態工廠方法最終還是呼叫的ThreadPoolExecutor的建構函式,指定了執行緒池基本大小為0,最大大小為Integer值上限,執行緒存活時間為60s,阻塞佇列是一個SynchronousQueue。從這些引數可以知道,當執行緒提交newCachedThreadPool的執行緒池時,由於基本大小為0,所以肯定大於基本大小,然後任務會進入阻塞佇列,而SynchronousQueue內部沒有任何容量,且當前執行緒數未達到最大執行緒數,所以任務將立即執行。任務執行完有60s的超時時間,如果在這段時間內有新任務呼叫,那麼新任務將直接在這個執行緒上執行。