Java8新特性8--使用CompletableFuture構建非同步應用
轉自:https://www.jianshu.com/p/4897ccdcb278
使用CompletableFuture構建非同步應用
Future 介面的侷限性
future介面可以構建非同步應用,但依然有其侷限性。它很難直接表述多個Future 結果之間的依賴性。實際開發中,我們經常需要達成以下目的:
- 將兩個非同步計算合併為一個——這兩個非同步計算之間相互獨立,同時第二個又依賴於第
一個的結果。 - 等待 Future 集合中的所有任務都完成。
- 僅等待 Future 集合中最快結束的任務完成(有可能因為它們試圖通過不同的方式計算同
一個值),並返回它的結果。 - 通過程式設計方式完成一個 Future 任務的執行(即以手工設定非同步操作結果的方式)。
- 應對 Future 的完成事件(即當 Future 的完成事件發生時會收到通知,並能使用 Future
計算的結果進行下一步的操作,不只是簡單地阻塞等待操作的結果)
新的CompletableFuture將使得這些成為可能。
CompletableFuture
非同步執行
首先,CompletableFuture實現了Future介面,因此你可以像Future那樣使用它。
其次,CompletableFuture並非一定要交給執行緒池執行才能實現非同步,你可以像下面這樣實現非同步執行。
public static void test1() throws Exception{ CompletableFuture<String> completableFuture=new CompletableFuture(); new Thread(new Runnable() { @Override public void run() { //模擬執行耗時任務 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //告訴completableFuture任務已經完成 completableFuture.complete("result"); } }).start(); //獲取任務結果,如果沒有完成會一直阻塞等待 String result=completableFuture.get(); System.out.println("計算結果:"+result); }
錯誤處理
如果沒有意外,上面發的程式碼工作得很正常。但是,如果任務執行過程中產生了異常會怎樣呢?
非常不幸,這種情況下你會得到一個相當糟糕的結果:異常會被限制在執行任務的執行緒的範圍內,最終會殺死該執行緒,而這會導致等待 get 方法返回結果的執行緒永久地被阻塞。
客戶端可以使用過載版本的 get 方法,它使用一個超時引數來避免發生這樣的情況。這是一種值得推薦的做法,你應該儘量在你的程式碼中新增超時判斷的邏輯,避免發生類似的問題。
使用這種方法至少能防止程式永久地等待下去,超時發生時,程式會得到通知發生了 Timeout-Exception 。不過,也因為如此,你不能指定執行任務的執行緒內到底發生了什麼問題。
為了能獲取任務執行緒內發生的異常,你需要使用
CompletableFuture 的completeExceptionally方法將導致CompletableFuture 內發生問題的異常丟擲。這樣,當執行任務發生異常時,呼叫get()方法的執行緒將會收到一個 ExecutionException 異常,該異常接收了一個包含失敗原因的Exception 引數。
public static void test2() throws Exception{
CompletableFuture<String> completableFuture=new CompletableFuture();
new Thread(new Runnable() {
@Override
public void run() {
try {
//模擬執行耗時任務
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("拋異常了");
}catch (Exception e) {
//告訴completableFuture任務發生異常了
completableFuture.completeExceptionally(e);
}
}
}).start();
//獲取任務結果,如果沒有完成會一直阻塞等待
String result=completableFuture.get();
System.out.println("計算結果:"+result);
}
工廠方法
前面我們通過程式設計自己建立 CompletableFuture 物件以及如何獲取返回值,雖然看起來這些操作已經比較方便,但還有進一步提升的空間.
CompletableFuture 類自身提供了大量精巧的工廠方法,使用這些方法能更容易地完成整個流程,還不用擔心實現的細節。
supplyAsync 方法接受一個生產者(Supplier)作為引數,返回一個 CompletableFuture
物件。生產者方法會交由 ForkJoinPool池中的某個執行執行緒( Executor )執行,但是你也可以使用 supplyAsync 方法的過載版本,傳遞第二個引數指定執行緒池執行器執行生產者方法。
public static void test3() throws Exception {
//supplyAsync內部使用ForkJoinPool執行緒池執行任務
CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{
//模擬執行耗時任務
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return "result";
});
System.out.println("計算結果:"+completableFuture.get());
}
allOf 工廠方法接收一個由CompletableFuture 構成的陣列,陣列中的所有 Completable-Future 物件執行完成之後,它返回一個 CompletableFuture<Void> 物件。這意味著,如果你需要等待多個 CompletableFuture 物件執行完畢,對 allOf 方法返回的
CompletableFuture 執行 join 操作可以等待CompletableFuture執行完成。
或者你可能希望只要 CompletableFuture 物件陣列中有任何一個執行完畢就不再等待,在這種情況下,你可以使用一個類似的工廠方法 anyOf 。
該方法接收一個 CompletableFuture 物件構成的陣列,返回由第一個執行完畢的 CompletableFuture 物件的返回值構成的 CompletableFuture<Object> 。
public static void test4() throws Exception {
CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
//模擬執行耗時任務
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return "result1";
});
CompletableFuture<String> completableFuture2=CompletableFuture.supplyAsync(()->{
//模擬執行耗時任務
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return "result2";
});
CompletableFuture<Object> anyResult=CompletableFuture.anyOf(completableFuture1,completableFuture2);
System.out.println("第一個完成的任務結果:"+anyResult.get());
CompletableFuture<Void> allResult=CompletableFuture.allOf(completableFuture1,completableFuture2);
//阻塞等待所有任務執行完成
allResult.join();
System.out.println("所有任務執行完成");
}
將兩個CompletableFuture建立聯絡
通常,我們會有多個需要獨立執行但又有所依賴的的任務。比如先等用於的訂單處理完畢然後才傳送郵件通知客戶。
thenCompose 方法允許你對兩個非同步操作進行流水線,第一個操作完成時,將其結果作為引數傳遞給第二個操作。你可以建立兩個CompletableFutures 物件,對第一個 CompletableFuture 物件呼叫thenCompose ,並向其傳遞一個函式。當第一個CompletableFuture 執行完畢後,它的結果將作為該函式的引數,這個函式的返回值是以第一個 CompletableFuture 的返回做輸入計算出的第二個 CompletableFuture 物件。
public static void test5() throws Exception {
CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{
//模擬執行耗時任務
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return "result1";
});
//等第一個任務完成後,將任務結果傳給引數result,執行後面的任務並返回一個代表任務的completableFuture
CompletableFuture<String> completableFuture2= completableFuture1.thenCompose(result->CompletableFuture.supplyAsync(()->{
//模擬執行耗時任務
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return "result2";
}));
System.out.println(completableFuture2.get());
}
另一種比較常見的情況是,你需要將兩個完
全不相干的 CompletableFuture 物件的結果整合起來,而且你也不希望等到第一個任務完全結
束才開始第二項任務。
這種情況,你應該使用 thenCombine 方法,它接收名為 BiFunction 的第二引數,這個引數
定義了當兩個 CompletableFuture 物件完成計算後,結果如何合併。
public static void test6() throws Exception {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
//模擬執行耗時任務
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return 100;
});
//將第一個任務與第二個任務組合一起執行,都執行完成後,將兩個任務的結果合併
CompletableFuture<Integer> completableFuture2 = completableFuture1.thenCombine(
//第二個任務
CompletableFuture.supplyAsync(() -> {
//模擬執行耗時任務
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return 2000;
}),
//合併函式
(result1, result2) -> result1 + result2);
System.out.println(completableFuture2.get());
}
響應 CompletableFuture 的 completion 事件
我們可以在每個CompletableFuture 上註冊一個操作,該操作會在 CompletableFuture 完成執行後呼叫它。CompletableFuture 通過 thenAccept 方法提供了這一功能,它接收
CompletableFuture 執行完畢後的返回值做引數。
public static void test7() throws Exception {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
//模擬執行耗時任務
System.out.println("task1 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return 100;
});
//註冊完成事件
completableFuture1.thenAccept(result->System.out.println("task1 done,result:"+result));
CompletableFuture<Integer> completableFuture2=
//第二個任務
CompletableFuture.supplyAsync(() -> {
//模擬執行耗時任務
System.out.println("task2 doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回結果
return 2000;
});
//註冊完成事件
completableFuture2.thenAccept(result->System.out.println("task2 done,result:"+result));
//將第一個任務與第二個任務組合一起執行,都執行完成後,將兩個任務的結果合併
CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2,
//合併函式
(result1, result2) -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result1 + result2;
});
System.out.println(completableFuture3.get());
}
慢慢消化吧,重在實踐!