從CompletableFuture到非同步程式設計設計
從CompletableFuture到非同步程式設計設計,筆者就分為2部分來分享CompletableFuture非同步程式設計設計,前半部分總結下CompletableFuture使用實踐,後半部分分享下CompletableFuture實現原理和非同步程式設計設計機制。
(ps:本文內容較多,請耐心閱讀。如果讀者瞭解CompletableFuture使用的話,可以直接看後半部分內容;如果熟悉CompletableFuture及非同步程式設計設計的話,可以直接翻到文件末尾點個“推薦”就好了,因為你已經掌握了Java非同步設計精髓了 :) ,若有不正確地方,感謝評論區指正交流~ )
Java8新增了CompletableFuture類,該類實現了CompletionStage和Future介面,簡化了Java非同步程式設計能力,該類方法較多,其實套路只有一個,那就是任務執行完成之後執行“回撥”。
CompletableFuture使用實踐
Java8新增的CompletableFuture 提供對非同步計算的支援,可以通過回撥的方式處理計算結果。CompletableFuture 類實現了CompletionStage和Future介面,所以還可以像之前使用Future那樣使用CompletableFuture ,儘管已不再推薦這樣用了。
CompletableFuture的建立
// 建立一個帶result的CompletableFuture CompletableFuture<String> future = CompletableFuture.completedFuture("result"); future.get(); // 預設建立的CompletableFuture是沒有result的,這時呼叫future.get()會一直阻塞下去知道有result或者出現異常 future = new CompletableFuture<>(); try { future.get(1, TimeUnit.SECONDS); }catch (Exception e) { // no care } // 給future填充一個result future.complete("result"); assert "result".equals(future.get()); // 給future填充一個異常 future = new CompletableFuture<>(); future.completeExceptionally(new RuntimeException("exception")); try { future.get(); } catch (Exception e) { assert "exception".equals(e.getCause().getMessage()); }
上面的示例是自己設定future的result,一般情況下我們都是讓其他執行緒或者執行緒池來執行future這些非同步任務。除了直接建立CompletableFuture 物件外(不推薦這樣使用),還可以使用如下4個方法建立CompletableFuture 物件:
// runAsync是Runnable任務,不帶返回值的,如果入參有executor,則使用executor來執行非同步任務 public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) // supplyAsync是待返回結果的非同步任務 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) // 使用示例 CompletableFuture.runAsync(() -> { System.out.println("hello world"); }, executor); CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; });
如果入參不帶executor,則預設使用ForkJoinPool.commonPool()作為執行非同步任務的執行緒池;否則使用executor執行任務。
CompletableFuture的完成動作
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn) // 使用示例 CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).whenCompleteAsync((result, e) -> { System.out.println(result + " " + e); }).exceptionally((e) -> { System.out.println("exception " + e); return "exception"; });
action是Action型別,從上面可以看出它既可以處理正常返回值也可以處理異常,whenComplete會在任務執行完成後直接在當前執行緒內執行action動作,字尾帶Async的方法是交給其他執行緒執行action(如果是執行緒池,執行action的可能和之前執行非同步任務的是同一個執行緒),入參帶executor的交給executor執行緒池來執行action動作,當發生異常時,會在當前執行緒內執行exceptionally方法。
除了用上面的whenComplete來執行完成動作之外,還可以使用handle方法,該方法可以返回一個新的CompletableFuture的返回型別。
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor) // handle方法示例: CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }); CompletableFuture<Integer> f2 = f1.handle((r, e) -> { System.out.println("handle"); return 1; });
除了使用handle方法來執行CompletableFuture返回型別轉換之外,還可以使用thenApply方法,二者不同的是前者會處理正常返回值和異常,因此可以遮蔽異常,避免繼續丟擲;而後者只能處理正常返回值,一旦有異常就會丟擲。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) // thenApply方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenApply((r) -> { System.out.println(r); return "aaa"; }).thenApply((r) -> { System.out.println(r); return 1; });
注意,上面的handle、thenApply都是返回新的CompletableFuture型別,如果只是為了在CompletableFuture完成之後執行某些消費動作,而不返回新的CompletableFuture型別,則可以使用thenAccept方法。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) // thenAccept方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenAccept(r -> { System.out.println(r); }).thenAccept(r -> { // 這裡的r為Void(null)了 System.out.println(r); });
上面的handle、thenApply和thenAppept都是對上一個CompletableFuture執行完的結果進行某些操作。那麼可不可以同時對2個CompletableFuture執行結果執行某些操作呢?其實也是可以的,使用thenAppeptBoth方法即可。注意,thenAppeptBoth和handle/thenApply/thenAppep的流程是一樣的,只不過thenAppeptBoth中包含了另一個CompletableFuture物件(注意,這裡另一個CompletableFuture物件的執行可並不是上一個CompletableFuture執行結束才開始執行的)。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) // thenAcceptBoth方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); });
注意,thenAcceptBoth方法是沒有返回值的(CompletableFuture<Void>),如果想用thenAcceptBoth這樣的功能並且還帶有返回值的CompletableFuture,那麼thenCombine方法就該上場了。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) // thenCombine方法示例 CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> { System.out.println(r1 + "-" + r2); return r1 + "-" + r2; });
thenAcceptBoth和runAfterBoth是當兩個CompletableFuture都計算完成,而下面的方法是當任意一個CompletableFuture計算完成的時候就會執行。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
如果當想在多個CompletableFuture都計算完成或者多個CompletableFuture中的一個計算完成後執行某個動作,可使用方法 allOf 和 anyOf。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
如果當任務完成時並不想用CompletableFuture的結果,可以使用thenRun方法來執行一個Runnable。
public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
以上方法都是在方法中返回一個值(或者不返回值),其實還可以返回一個CompletableFuture,是不是很像類的組合一樣。
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor) // thenCompose方法示例: CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenCompose(r -> { System.out.println(r); return CompletableFuture.supplyAsync(() -> { System.out.println(r + " result2"); return r + " result2"; }); }); // 上面的程式碼和下面的程式碼效果是一樣的 CompletableFuture.supplyAsync(() -> { System.out.println("hello world"); return "result"; }).thenApply(r -> { System.out.println(r); return r; }).thenApplyAsync(r -> { System.out.println(r + " result2"); return r + " result2"; });
CompletableFuture實現機制
先拋開 CompletableFuture 不談,如果程式中使用了執行緒池,如何才能在某個任務執行完成之後執行某些動作呢?其實Java執行緒池本身已經提供了任務執行前後的hook方法(beforeExecute和afterExecute),如下:
public class ThreadPoolExecutor extends AbstractExecutorService { // ... protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } // ... }
我們只需要自定義執行緒池繼承ThreadPoolExecutor ,然後重寫beforeExecute和afterExecute方法即可,在afterExecute裡可以執行一些動作。關於重寫ThreadPoolExecutor 的一個示例可點選ListenableThreadPoolExecutor檢視。
那麼CompletableFuture 的實現機制是怎樣的呢?其實,和上面的所說的“afterExecute機制”是類似的(本質是一樣的,回撥機制),也是在任務執行完成後執行某些動作,如下程式碼:
CompletableFuture.supplyAsync(() -> { // callable任務 System.out.println("hello world"); return "result"; }).thenApply(r -> { // 任務完成之後的動作(回撥方法),類似於ThreadPoolExecutor.afterExecute方法 System.out.println(r); return r; });
上面的示例程式碼其實主要完成了3個步驟,這3個步驟其實也是CompletableFuture的實現流程:
- 執行任務
- 新增任務完成之後的動作(回撥方法)
- 執行回撥
下面筆者就以上面的示例程式碼,按照這3個步驟依次進行分析,此時建議讀者開啟idea,寫個demo進行debug,這裡篇幅有限,筆者就只講解主要流程程式碼,其他程式碼自行閱讀即可 :)
1、執行任務
執行任務的主要邏輯就是 AsyncSupply.run 方法:
public void run() { CompletableFuture<T> d; Supplier<T> f; // dep是當前CompletableFuture,fn是任務執行邏輯 if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { // 1 任務執行 & result cas設定 d.completeValue(f.get()); } catch (Throwable ex) { // 1.1 result cas異常設定 d.completeThrowable(ex); } } // 2 任務完成,可能涉及到回撥的執行 d.postComplete(); } }
2、添加回調
添加回調方法的流程是從 thenApply 開始的:
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { // 當上一個CompletableFuture未完成時,將該CompletableFuture新增 // 到上一個CompletableFuture的statck中 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
CompletableFuture.statck 是 UniCompletion 型別的,該型別如下:
UniCompletion<T,V> { volatile Completion next; // Treiber stack link Executor executor; // executor to use (null if none) CompletableFuture<V> dep; // the dependent to complete CompletableFuture<T> src; // source for action }
3、執行回撥
執行回撥是從CompletableFuture.postComplete 開始的:
final void postComplete() { /* * On each step, variable f holds current dependents to pop * and run. It is extended along only one path at a time, * pushing others to avoid unbounded recursion. */ CompletableFuture<?> f = this; Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; // cas設定h.next到當前CompletableFuture.statck if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } } // UniAccept final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) // 執行回撥 return null; dep = null; src = null; fn = null; // 返回當前CompletableFuture 或者 遞迴呼叫postComplete return d.postFire(a, mode); }
看完上面3個步驟,是不是還不太清楚多個CompletableFuture之間的執行流程呢,說實話筆者第一次看的時候也是這樣的 :(,下面我們換個例子並給出圖示來看:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello world f1"); sleep(1); // TimeUnit.SECONDS.sleep(1) return "result f1"; }); CompletableFuture<String> f2 = f1.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f3 = f2.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f4 = f1.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f5 = f4.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; }); CompletableFuture<String> f6 = f5.thenApply(r -> { System.out.println(r); sleep(1); return "f2"; });
上面程式碼對應的CompletableFuture及其Completion關係如下圖:
結合上圖和postComplete流程,可以看出執行回撥的順序是:f1 -> f4 -> f5 -> f6 -> f2 -> f3。(如果這裡沒看懂,可以回過頭再看下postComplete方法的原始碼~)
非同步程式設計設計
分析完了CompletableFuture,相信大家都已經對CompletableFuture的設計與實現有了進一步的理解。那麼對於非同步程式設計有哪些實際應用場景,其本質到底是什麼呢?
非同步處理的本質其實就是回撥(系統層藉助於指標來實現,準確來說是函式指標),使用者提供一個回撥方法,回撥函式不是由該函式的實現方直接呼叫,而是在特定的事件或條件發生時由另外的一方呼叫的,用於對該事件或條件進行響應。從“巨集觀”來看,CompletableFuture的實現其實很簡單,就是回撥,即在任務執行完成之後進行回撥,回撥中可能涉及到其他操作,比如下一個回撥或者執行下一個任務。
非同步程式設計在應用場景較多,很多語言,比如Node.js,採用回撥的方式實現非同步程式設計。Java的一些框架,比如Netty,自己擴充套件了Java的 Future
介面,提供了addListener
等多個擴充套件方法:
ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8080) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoHandler()); } });
dubbo中consumer對於RPC response的處理是基於回撥機制的,Google guava也提供了通用的擴充套件Future:ListenableFuture、SettableFuture 以及輔助類Futures等,方便非同步程式設計。
final String name = ...; inFlight.add(name); ListenableFuture<Result> future = service.query(name); future.addListener(new Runnable() { public void run() { processedCount.incrementAndGet(); inFlight.remove(name); lastProcessed.set(name); logger.info("Done with {0}", name); } }, executor);
參考資料:
1、Java CompletableFuture 詳解
2、https://www.cnblogs.com/aniao/p/aniao_cf.html
3、https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html