1. 程式人生 > >Java非同步之《我call(),Future在哪裡》

Java非同步之《我call(),Future在哪裡》

我們大家都知道,在 Java 中建立執行緒主要有三種方式: * 繼承 Thread 類; * 實現 Runnable 介面; * 實現 Callable 介面。 而後兩者的區別在於 Callable 介面中的 `call()` 方法可以非同步地返回一個計算結果 Future,並且一般需要配合ExecutorService 來執行。這一套操作在程式碼實現上似乎也並不難,可是對於call()方法具體怎麼(被ExecutorService)執行的,以及 Future 這個結果是怎麼獲取的,卻又不是很清楚了。 那麼本篇文章,我們就一起來學習下 Callable 介面以及 Future 的使用,主要面向兩個問題: * 承載著具體任務的 call() 方法如何被執行的? * 任務的執行結果如何得到? 你可能會說,這兩個難道不是一個問題嗎?任務執行了就會有返回結果,而返回結果也一定是任務執行了才返回的,難道還能返回一個其他任務的結果麼??不要著急,耐心的看下去,你就會發現,這兩個還真的就是一個問題。 本文將分為兩個部分,第一部分分別介紹 `任務`、`執行`、以及`結果`這三個概念在 Java API 中的實體和各自的繼承關係,第二部分通過一個簡單的例子回顧他們的用法,再理解下這兩個問題的答案。 ## Callable、Executor 與 Future 既然是一個任務被執行並返回結果,那麼我們先來看看具體的任務,也就是 Callable 介面。 ### 任務:Callable 非常簡單,只包含一個有泛型**返回值**的 call() 方法,需要在最後返回定義型別的結果。如果任務沒有需要返回的結果,那麼將泛型 V 設為 void 並`return null;`就可以了。對比的是 Runnable,另一個明顯的區別則是 Callable可以丟擲異常。 ```java public interface Callable { V call() throws Exception; } public interface Runnable { public abstract void run(); } ``` ### 執行:ExecutorService 說到執行緒就少不了執行緒池,而說到執行緒池肯定離不開 `Executor` 介面。下面這幅圖是 Executor 的框架,我們常用的是其中的兩個具體實現類 ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor,在 Executors 類中通過靜態方法獲取。Executors 中包含了執行緒池以及執行緒工廠的構造,與 Executor 介面的關係類似於 Collection 介面和 Collections 類的關係。 ![](https://img2020.cnblogs.com/blog/1515111/202007/1515111-20200731112004089-1302424294.png) 那麼我們自頂向下,從原始碼上了解一下 Executor 框架,學習學習任務是如何被執行的。首先是 Executor 介面,其中只定義了 execute() 方法。 ```java public interface Executor { void execute(Runnable command); } ``` ExecutorService 介面繼承了 Executor 介面,主要擴充套件了一系列的 submit() 方法以及對 executor 的終止和判斷狀態。以第一個` Future submit(Callable task);`為例,其中 task 為使用者定義的執行的非同步任務,Future 表示了任務的執行結果,泛型 T 代表任務結果的型別。 ```java public interface ExecutorService extends Executor { void shutdown(); // 現有任務完成後停止執行緒池 List shutdownNow(); // 立即停止執行緒池 boolean isShutdown(); // 判斷是否已停止 boolean isTerminated(); Future submit(Callable task); // 提交Callale任務 Future submit(Runnable task, T result); Future submit(Runnable task); // 針對Callable集合的invokeAll()等方法 } ``` 抽象類`AbstractExecutorService` 是 `ThreadPoolExecutor` 的基類,在下面的程式碼中,它實現了ExecutorService 介面中的 submit() 方法。註釋中是對應的 newTaskFor() 方法的程式碼,非常簡單,就是將傳入的Callable 或 Runnable 引數封裝成一個 FutureTask 物件。 ```java // 1.第一個過載方法,引數為Callable public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); // return new FutureTask(callable); execute(ftask); return ftask; } // 2.第二個過載方法,引數為Runnable public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, null); // return new FutureTask(task, null); execute(ftask); return ftask; } // 3.第三個過載方法,引數為Runnable + 返回物件 public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); // return new FutureTask(task, result); execute(ftask); return ftask; } ``` 那麼也就是說,無論傳入的是 Callable 還是 Runnable,submit() 方法其實就做了三件事 ![](https://img2020.cnblogs.com/blog/1515111/202007/1515111-20200731112025047-253642856.png) 具體來說,submit() 中首先生成了一個 RunnableFuture 引用的 FutureTask 例項,然後呼叫 execute() 方法來執行它,那麼我們可以推測 FutureTask 繼承自 RunnableFuture,而 RunnableFuture 又實現了 Runnable,因為execute() 的引數應為 Runnable 型別。上面還涉及到了 FutureTask 的建構函式,也來看一下。 ```java public FutureTask(Callable callable) { this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); // 通過介面卡將runnable在call()中執行並返回result this.state = NEW; } ``` FutureTask 共有兩個構造方法。第一個構造方法比較簡單,對應上面的第一個 submit(),採用組合的方式封裝Callable 並將狀態設為`NEW`;而第二個構造方法對應上面的後兩個 submit() 過載,不同之處是首先使用了`Executors.callable`來將 Runnable 和 result 組合成 Callable,這裡採用了介面卡`RunnableAdapter implements Callable`,巧妙地在 call() 中執行 Runnable 並返回結果。 ```java static final class RunnableAdapter implements Callable { final Runnable task; final T result; // 返回的結果;顯然:需要在run()中賦值 RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } ``` 在介面卡設計模式中,通常包含**目標介面 Target、介面卡 Adapter 和被適配者 Adaptee **三類角色,其中目標介面代表客戶端(當前業務系統)所需要的功能,通常為藉口或抽象類;被適配者為現存的不能滿足使用需求的類;介面卡是一個轉換器,也稱 wrapper,用於給被適配者新增目標功能,使得客戶端可以按照目標介面的格式正確訪問。對於 RunnableAdapter 來說,Callable 是其目標介面,而 Runnable 則是被適配者。RunnableAdapter 通過覆蓋 call() 方法使其可按照 Callable 的要求來使用,同時其構造方法中接收被適配者和目標物件,滿足了 call() 方法有返回值的要求。 ![](https://img2020.cnblogs.com/blog/1515111/202007/1515111-20200731112035195-1527394683.png) 那麼總結一下 submit() 方法執行的流程,就是:**Callable 被封裝在 Runnable 的子類中傳入 execute() 得以執行**。 ### 結果:Future 要說 Future 就是非同步任務的執行結果其實並不準確,因為它代表了一個任務的執行過程,有狀態、可以被取消,而 get() 方法的返回值才是任務的結果。 ```java public interface Future { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } ``` 我們在上面中還提到了 RuunableFuture 和 FutureTask。從官方的註釋來看,RuunableFuture 就是一個可以 run的 future,實現了 Runnable 和 Future 兩個介面,在 run() 方法中執行完計算時應該將結果儲存起來以便通過 get()獲取。 ```java public interface RunnableFuture extends Runnable, Future { /** * Sets this Future to the result of its computation unless it has been cancelled. */ void run(); } ``` FutureTask 直接實現了 RunnableFuture 介面,作為執行過程,共有下面這幾種狀態,其中 COMPLETING 為一個暫時狀態,表示正在設定結果或異常,對應的,設定完成後狀態變為 NORMAL 或 EXCEPTIONAL;CANCELLED、INTERRUPTED 表示任務被取消或中斷。在上面的構造方法中,將 state 初始化為 NEW。 ```java private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; ``` 然後是 FutureTask 的主要內容,主要是 run() 和 get()。注意 outcome 的註釋,無論是否發生異常返回的都是這個 outcome,因為在執行中如果執行成功就將結果設定給了它(`set()`),而發生異常時將異常賦給了他(`setException()`),而在獲取結果時也都返回了 outcome(通過`report()`)。 ```java public class FutureTask implements RunnableFuture { private Callable callable; // target,待執行的任務 /** 儲存執行結果或異常,在get()方法中返回/丟擲 */ private Object outcome; // 非volatile,通過CAS保證執行緒安全 public void run() { ...... Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); // 呼叫call()執行使用者任務並獲取結果 ran = true; // 執行完成,ran置為true } catch (Throwable ex) { // 呼叫call()出現異常,而run()方法繼續執行 result = null; ran = false; setException(ex); // setException(Throwable t): compareAndSwapInt(NEW, COMPLETING); outcome = t; } if (ran) set(result); // set(V v): compareAndSwapInt(NEW, COMPLETING); outcome = v; } } public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 加入佇列等待COMPLETING完成,可響應超時、中斷 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 超時等待 } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) // 將outcome作為執行結果返回 return (V)x; if (s >
= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); // 將outcome作為捕獲的返回 } } ``` FutureTask 實現了 RunnableFuture 介面,所以有兩方面的作用。 * 第一,作為 Runnable 傳入 execute() 方法來執行,同時封裝 Callable 物件並在 run() 中呼叫其 call() 方法; * 第二,作為 Future 管理任務的執行狀態,將 call() 的返回值儲存在 outcome 中以通過 get() 獲取。這似乎就能回答開頭的兩個問題,並且渾然天成,就好像是一個問題,除非發生異常的時候返回的不是任務的結果而是異常物件。 總結一下繼承關係: ![](https://img2020.cnblogs.com/blog/1515111/202007/1515111-20200731112046948-180445086.png) ## 二、使用舉例 文章的標題有點唬人,說到底還是講 Callable 的用法。現在我們知道了 Future 代表了任務執行的過程和結果,作為 call() 方法的返回值來獲取執行結果;而 FutureTask 是一個 Runnable 的 Future,既是任務執行的過程和結果,又是 call 方法最終執行的載體。下面通過一個例子看看他們在使用上的區別。 首先建立一個任務,即定義一個任務類實現 Callable 介面,在 call() 方法裡新增我們的操作,這裡用耗時三秒然後返回 100 模擬計算過程。 ```java class MyTask implements Callable { @Override public Integer call() throws Exception { System.out.println("子執行緒開始計算..."); for (int i=0;i<3;++i){ Thread.sleep(1000); System.out.println("子執行緒計算中,用時 "+(i+1)+" 秒"); } System.out.println("子執行緒計算完成,返回:100"); return 100; } } ``` 然後呢,建立一個執行緒池,並例項化一個 MyTask 備用。 ```java ExecutorService executor = Executors.newCachedThreadPool(); MyTask task = new MyTask(); ``` 現在,分別使用 Future 和 FutureTask 來獲取執行結果,看看他們有什麼區別。 ### 使用Future Future 一般作為 submit() 的返回值使用,並在主執行緒中以阻塞的方式獲取非同步任務的執行結果。 ```java System.out.println("主執行緒啟動執行緒池"); Future future = executor.submit(task); System.out.println("主執行緒得到返回結果:"+future.get()); executor.shutdown(); ``` 看看輸出結果: ```java 主執行緒啟動執行緒池 子執行緒開始計算... 子執行緒計算中,用時 1 秒 子執行緒計算中,用時 2 秒 子執行緒計算中,用時 3 秒 子執行緒計算完成,返回:100 主執行緒得到返回結果:100 ``` 由於 get() 方法阻塞獲取結果,所以輸出順序為子執行緒計算完成後主執行緒輸出結果。 ### 使用FutureTask 由於 FutureTask 集**任務與結果**於一身,所以我們可以使用 FutureTask 自身而非返回值來管理任務,這需要首先利用 Callable 物件來構造 FutureTask,並呼叫不同的`submit()`過載方法。 ```java System.out.println("主執行緒啟動執行緒池"); FutureTask futureTask = new FutureTask<>(task); executor.submit(futureTask); // 作為Ruunable傳入submit()中 System.out.println("主執行緒得到返回結果:"+futureTask.get()); // 作為Future獲取結果 executor.shutdown(); ``` 這段程式的輸出與上面中完全相同,其實兩者在實際執行中的區別也不大,雖然前者呼叫了`submit(Callable task)`而後者呼叫了`submit(Runnable task)`,但最終都通過`execute(futuretask)`來把任務加入執行緒池中。 ## 總結 上面大費周章其實只是儘可能細緻地講清楚了 Callable 中的任務是如何執行的,總結起來就是: * 執行緒池中,submit() 方法實際上將 Callable 封裝在 FutureTask 中,將其作為 Runnable 的子類傳給 execute()真正執行; * FutureTask 在 run() 中呼叫 Callable 物件的 call() 方法並接收返回值或捕獲異常儲存在`Object outcome`中,同時管理執行過程中的狀態`state`; * FutureTask 同時作為 Future 的子類,通過 get() 返回任務的執行結果,若未執行完成則通過等待佇列進行阻塞等待完成; FutureTask 作為一個 Runnable 的 Future,其中最重要的兩個方法如下。 ![](https://img2020.cnblogs.com/blog/1515111/202007/1515111-20200731112056994-197673983.png) ![](https://img2020.cnblogs.com/blog/1515111/202007/1515111-20200731111949585-5150842