CompletableFuture詳解
在JDK1.5已經提供了Future和Callable的實現,可以用於阻塞式獲取結果,如果想要非同步獲取結果,通常都會以輪詢的方式去獲取結果,如下:
//定義一個非同步任務
Future<String> future = executor.submit(()->{
Thread.sleep(2000);
return "hello world";
});
//輪詢獲取結果
while (true){
if(future.isDone()) {
System.out.println(future.get());
break;
}
}
從上面的形式看來輪詢的方式會耗費無謂的CPU資源,而且也不能及時地得到計算結果.所以要實現真正的非同步,上述這樣是完全不夠的,在Netty中,我們隨處可見非同步程式設計
ChannelFuture f = serverBootstrap.bind(port).sync();
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
System.out.println("complete");
}
});
而JDK1.8中的CompletableFuture
就為我們提供了非同步函數語言程式設計,CompletableFuture
提供了非常強大的Future
的擴充套件功能,可以幫助我們簡化非同步程式設計的複雜性,提供了函數語言程式設計的能力,可以通過回撥的方式處理計算結果,並且提供了轉換和組合CompletableFuture
的方法。
1. 建立CompletableFuture物件
CompletableFuture
提供了四個靜態方法用來建立CompletableFuture物件:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Asynsc
表示非同步,而supplyAsync
與runAsync
不同在與前者非同步返回一個結果,後者是void.第二個函式第二個引數表示是用我們自己建立的執行緒池,否則採用預設的ForkJoinPool.commonPool()
作為它的執行緒池.其中Supplier
是一個函式式介面,代表是一個生成者的意思,傳入0個引數,返回一個結果.(更詳細的可以看我另一篇文章)
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello world";
});
System.out.println(future.get()); //阻塞的獲取結果 ''helllo world"
2. 主動計算
以下4個方法用於獲取結果
//同步獲取結果
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
getNow
有點特殊,如果結果已經計算完則返回結果或者丟擲異常,否則返回給定的valueIfAbsent值。join()
與get()
區別在於join()
返回計算的結果或者丟擲一個unchecked異常(CompletionException),而get()
返回一個具體的異常.
- 主動觸發計算.
public boolean complete(T value)
public boolean completeExceptionally(Throwable ex)
上面方法表示當呼叫CompletableFuture.get()
被阻塞的時候,那麼這個方法就是結束阻塞,並且get()
獲取設定的value.
public static CompletableFuture<Integer> compute() {
final CompletableFuture<Integer> future = new CompletableFuture<>();
return future;
}
public static void main(String[] args) throws Exception {
final CompletableFuture<Integer> f = compute();
class Client extends Thread {
CompletableFuture<Integer> f;
Client(String threadName, CompletableFuture<Integer> f) {
super(threadName);
this.f = f;
}
@Override
public void run() {
try {
System.out.println(this.getName() + ": " + f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
new Client("Client1", f).start();
new Client("Client2", f).start();
System.out.println("waiting");
//設定Future.get()獲取到的值
f.complete(100);
//以異常的形式觸發計算
//f.completeExceptionally(new Exception());
Thread.sleep(1000);
}
3. 計算結果完成時的處理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面4個方法是當計算階段結束的時候觸發,BiConsumer
有兩個入參,分別代表計算返回值,另外一個是異常.無返回值.方法不以Async結尾,意味著Action使用相同的執行緒執行,而Async可能會使用其它的執行緒去執行(如果使用相同的執行緒池,也可能會被同一個執行緒選中執行)。
future.whenCompleteAsync((v,e)->{
System.out.println("return value:"+v+" exception:"+e);
});
- handle()
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
與whenComplete()
不同的是這個函式返回CompletableFuture
並不是原始的CompletableFuture
返回的值,而是BiFunction
返回的值.
4. CompletableFuture的組合
- thenApply
當計算結算完成之後,後面可以接繼續一系列的thenApply,來完成值的轉化.
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
它們與handle方法的區別在於handle方法會處理正常計算值和異常,因此它可以遮蔽異常,避免異常繼續丟擲。而thenApply方法只是用來處理正常值,因此一旦有異常就會丟擲。
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello world";
});
CompletableFuture<String> future3 = future.thenApply((element)->{
return element+" addPart";
}).thenApply((element)->{
return element+" addTwoPart";
});
System.out.println(future3.get());//hello world addPart addTwoPart
5. CompletableFuture的Consumer
只對CompletableFuture
的結果進行消費,無返回值,也就是最後的CompletableFuture
是void.
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
//入參為原始的CompletableFuture的結果.
CompletableFuture future4 = future.thenAccept((e)->{
System.out.println("without return value");
});
future4.get();
- thenAcceptBoth
這個方法用來組合兩個CompletableFuture
,其中一個CompletableFuture
等待另一個CompletableFuture
的結果.
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
return "hello world";
});
CompletableFuture future5 = future.thenAcceptBoth(CompletableFuture.completedFuture("compose"),
(x, y) -> System.out.println(x+y));//hello world compose
6. Either和ALL
thenAcceptBoth
是當兩個CompletableFuture
都計算完成,而我們下面要了解的方法applyToEither
是當任意一個CompletableFuture計算完成的時候就會執行。
Random rand = new Random();
CompletableFuture<Integer> future9 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future10 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return 200;
});
//兩個中任意一個計算完成,那麼觸發Runnable的執行
CompletableFuture<String> f = future10.applyToEither(future9,i -> i.toString());
//兩個都計算完成,那麼觸發Runnable的執行
CompletableFuture f1 = future10.acceptEither(future9,(e)->{
System.out.println(e);
});
System.out.println(f.get());
如果想組合超過2個以上的CompletableFuture
,allOf
和anyOf
可能會滿足你的要求.allOf
方法是當所有的CompletableFuture
都執行完後執行計算。anyOf
方法是當任意一個CompletableFuture
執行完後就會執行計算,計算的結果相同。
總結
有了CompletableFuture
之後,我們自己實現非同步程式設計變得輕鬆很多,這個類也提供了許多方法來組合CompletableFuture
.結合Lambada表示式來用,變得很輕鬆.