1. 程式人生 > >Java併發框架:Executor簡析

Java併發框架:Executor簡析

一、簡介

很多剛剛接觸多執行緒的同學喜歡通過new Thread().start()來建立並啟動一個執行緒,在複雜的應用中這是一個很不好的習慣,這樣創建出來的執行緒往往缺乏有效的管理,容易造成各種各樣的問題並難以解決。當你為此感到困惑的時候,你就應該好好學習並使用這個框架來構建多執行緒應用程式了。

Executor框架是java.util.concurrent中的一個執行緒管理工具,它是一個靈活強大的非同步任務執行框架。

首先我們應該先了解幾個概念:

(1)執行緒池
從字面意義上來看,是指管理一組同構工作執行緒的資源池。簡單來說,執行緒池的作用就是限制系統中執行執行緒的數量。
相比直接建立Thread物件並執行,執行緒池有很多優點:
1、可以根據系統具體的配置,自定義一個合適的執行緒數量,防止因為建立過多的執行緒物件耗費過多的系統記憶體或造成程式的崩潰。
2、減少了建立和銷燬執行緒的次數,執行緒池中的執行緒可以反覆利用,執行提交的任務。
3、該框架基於生產者-消費者模式,可以很好的將任務的提交和任務的實際執行解耦。
4、可以方便地對其中的執行緒進行各種管理。
(2)工作佇列


工作佇列由Executor管理,其中工作佇列儲存了所有等待執行的任務。它的任務非常簡單:從工作佇列中獲取一個任務,執行任務,然後返回執行緒池等待下一個任務,它不會像執行緒那樣去競爭CPU資源。工作佇列排列方法有三種:有界佇列、無界佇列、同步移交。
可以看成這樣:
這裡寫圖片描述
圖片轉自https://blog.csdn.net/yanyan19880509/article/details/52562039

下面是Executor框架常用的介面及其子類:

二、配置執行緒池

1、建立執行緒池

(1)工廠方法構建執行緒池

我們可以通過Executors中的靜態方法來建立一個ThreadPoolExecutor:
newFixedThreadPool(int nThread)


建立一個固定長度為nThread的執行緒池,每當提交一個任務就建立一個新的執行緒,直到達到最大數量,這個時候執行緒池的規模將不再變化。如果執行緒丟擲一個未預期的異常,那麼該執行緒池也會重新建立一個新的執行緒
newCacheThreadPool()
建立一個可快取的執行緒池,該執行緒池的規模不存在限制。如果該執行緒池的規模超過了處理需求時,那麼會自動回收空閒的執行緒,處理需求增加時也會建立一個新的執行緒池。
newSingleThreadExecutor()
建立一個工作者執行緒來執行當前任務,如果這個執行緒異常結束那麼會重新新建一個執行緒,這將會保證以序列方式執行任務(無需保證其執行任務的執行緒安全性)
示例程式碼:

ExecutorService exec = Executors.newFixedThreadPool(16);
(2)自定義執行緒池

ThreadPoolExecutor建構函式如下:

public ThreadPoolExecutor(int corePoolSize,     //執行緒池基本大小
                          int maximumPoolSize,  //執行緒池最大大小
                          long keepAliveTime,   //存活時間
                          TimeUnit unit,        
                          BlockingQueue<Runnable> workQueue, //任務佇列
                          ThreadFactory threadFactory, //定義執行緒工廠
                          RejectedExecutionHandler handler) //飽和策略
                          //後面兩個引數為可選引數

執行緒池基本大小、最大大小以及存活時間分別共同負責執行緒的建立和銷燬。基本大小是沒有任務執行時執行緒池的大小,只有在工作佇列已滿的狀態下才會建立超過這個數量的執行緒。最大大小表示執行緒數量的上限。
ThreadPoolExecutor允許提供一個BlockingQueue來儲存等待執行的任務。newFixedThreadPool(int nThread)和newSingleThreadExecutor()預設採用了無界佇列。如果任務到達速度超過了執行緒處理它們的速度,那麼佇列的長度會無限增加,有可能會因此而導致系統的崩潰。
一種更穩妥的方式是採用ArrayBlockingQueue或有界的LinkedBlockingQueue、PriorityBlockingQueue。可以有效地防止資源耗盡的情況發生。那麼新的問題來了:佇列填滿後新的任務怎麼辦?這裡我們就要提到飽和策略了
飽和策略
ThreadPoolExecutor的飽和策略可以通過呼叫setRejectedExecutionHandle來修改。Java提供了四種飽和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。
AbortPolicy(中止策略)是預設的飽和策略,該策略將丟擲未檢查的RejectedExecutionException異常。呼叫者可捕獲該異常並根據自己的需求編寫處理的程式碼。
CallerRunsPolicy(呼叫者執行策略):該策略既不會拋棄任務,也不會丟擲異常,而是將任務回退給呼叫者(比如回退給main執行緒,由main執行緒自行處理)。
DiscardPolicy(拋棄策略):當工作佇列已滿並無法新增,拋棄策略會悄悄地扔掉該任務。
DiscardOldestPolicy(拋棄最舊策略):該策略會拋棄下一個執行的任務並嘗試重新提交新的任務,如果工作者佇列是優先佇列,那麼將拋棄優先順序最高的佇列(所以不要和優先佇列同時使用)。
執行緒工廠
每當執行緒池建立一個新的執行緒時,都是通過執行緒工廠方法來完成的。通過制定一個自定義的ThreadFactory,可以定製執行緒池的配置資訊,例如給執行緒指定一個UncaughtExceptionHandler,或是取個名字,或是例項化一個定製的Thread用來執行除錯資訊。
ThreadFactory介面只定義了一個newThread方法,每當執行緒池建立執行緒都會呼叫這個方法。
具體方法可以參照這篇部落格:https://blog.csdn.net/wxwzy738/article/details/8520630

2、構造完成後修改執行緒池配置

執行緒構造完成後仍然可以通過setter函式來修改大多數傳遞給它的建構函式引數,如執行緒池基本大小、最大大小、存活時間、執行緒工廠及RejectedExecutionHandle(拒絕執行處理器)
如果不希望構造完成後被修改,可以通過unconfigurableExecutorService進行包裝。

3、擴充套件執行緒池

ThreadPoolExecution提供了幾個protected方法:beforeExecute、afterExecute、terminated,可以在子類對這些方法進行重寫以擴充套件
afterExecute
無論任務是正常返回還是丟擲異常而返回,afterExecute都會呼叫
beforeExecute
與afterExecute相反,該方法在人物執行前執行,如果丟擲RuntimeException則不會執行任務
terminated
關閉執行緒池後自動呼叫該方法
具體可參照這篇部落格:https://blog.csdn.net/zhongxiangbo/article/details/70882309

三、提交任務

Executor介面提供了Runnable任務的方法,Executor介面擴充套件了ExecutorService,提供更廣泛的執行緒管理方法:包含關閉其中管理的執行緒,提交Runnable、Callable任務、設定任務時限、判斷該執行緒池是否關閉等方法。

public interface ExecutorService extends Executor 
{
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit) throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                                  throws InterruptedException, ExecutionException, TimeoutException;
}
提交Runnable任務方法

建立一個任務類並繼承Runnable介面,然後通過呼叫ExecutorService介面的execute(Runnable command)提交
例如:

private ExecutorService exec = Executors.newFixedThreadPool(16);
private class TestTask implements Runnable{
    public void run(){}
}
private void submitTask(){
    exec.execute(new TestTask());
}
提交Callable任務和接收返回值

提交方法有兩種
(1)建立一個任務類並繼承Callable介面,然後通過呼叫ExecutorService介面的submit(Callable<T> task)提交。
(2)採用invokeAll方法:將所有任務新增到一個List集合中,然後呼叫invokeAll方法提交,該方法會返回一個封裝有Future的List集合。
由於Callable任務具有返回值,我們可以通過BlockingQueue來儲存Future<T>物件,並在提交完成後呼叫get方法來獲取返回值。
這裡只給出第一種提交方法:

private ExecutorService service = Executors.newFixedThreadPool(16);
private BlockingQueue<Future<Integer>> queue = new ArrayBlockingQueue<>(100);
private class CalcTask implements Callable<Integer>{
        public Integer call() {
            int sum = calc(); //calc()為某個計算方法
            return sum;
        }
}
private void submitTask(){
    Future<Integer> future = service.submit(new CalcTask());
    Integer result = future.get();
    System.out.println(result);
}

四、關閉執行緒池

Executor是以非同步方式來執行任務的,因此在任何時刻,之前提交的任務不是立即可見的。有些任務已經完成,有些任務可能正在執行,也有些任務可能在工作佇列中等待執行。既然Executor是為應用程式服務的,那麼它們應該是可關閉的
為了解決生命週期問題,我們可以使用ExecutorService介面中定義的幾個管理方法。

Executor的生命週期有三種狀態:執行、關閉、終止。

下面簡單介紹幾個關閉Executor的方法:

shutdown()
shutdown方法會採用一種平緩地關閉Executor的方法:不再接收新的任務,同時等待已經提交的任務執行完成。
shutdownNow()
採用一種粗暴的關閉方式:嘗試取消正在執行的任務,包括已提交還未開始執行的任務。執行該方法會返回所有尚未啟動的任務清單。如果希望獲取正在執行但被強制關閉的任務,可以在任務run()方法中使用一個finally,當檢測到isTerminated為真時將該任務新增至一個指定的集合。
awaitTermination(long timeOut, TimeUnit unit)
該方法與shutdown方法類似,但是awaitTermination會使呼叫該方法執行緒的阻塞直到執行緒池終止或者時間超限。該方法會返回boolean,代表是否停止。

記住,為了保證良好的封裝性,不要直接通過Thread來終止執行緒,應使用執行緒池來終止執行緒。

如果關閉ExecutorService後繼續提交任務,那麼將由RejectedExecutionHandle(拒絕執行處理器)來處理。它會拋棄任務並丟擲一個未檢查的RejectedExecutionException異常。所有任務完成後,ExecutorService轉入終止狀態。可以呼叫isTerminated()方法來檢查是否已經終止。