併發程式設計(三)Promise, Future 和 Callback
併發程式設計(三)Promise, Future 和 Callback
在併發程式設計中,我們通常會用到一組非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一個可能還沒有實際完成的非同步任務的結果,針對這個結果可以新增 Callback 以便在任務執行成功或失敗後做出對應的操作,而 Promise 交由任務執行者,任務執行者通過 Promise 可以標記任務完成或者失敗。 可以說這一套模型是很多非同步非阻塞架構的基礎。
這一套經典的模型在 Scala、C# 中得到了原生的支援,但 JDK 中暫時還只有無 Callback 的 Future 出現,當然也並非在 Java 界就沒有發展了,比如 Guava 就提供了 ListenableFuture 介面,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 機制。
一、Future 模式 - 將來式(JDK)
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(() -> {
TimeUnit.SECONDS.sleep(5);
return 5;
});
Integer result = future.get();
二、Future 模式--回撥式(Guava)
Future 模式的第二種用法便是回撥。很不幸的事,JDK 實現的 Future 並沒有實現 callback, addListener 這樣的方法,想要在 JAVA 中體驗到 callback 的特性,得引入一些額外的框架。
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); ListenableFuture<Integer> future = service.submit(new Callable<Integer>() { public Integer call() throws Exception { TimeUnit.SECONDS.sleep(5); return 100; } }); Futures.addCallback(future, new FutureCallback<Integer>() { public void onSuccess(Integer result) { System.out.println("success:" + result); } public void onFailure(Throwable throwable) { System.out.println("fail, e = " + throwable); } }); Thread.currentThread().join();
三、Future 模式--回撥式(Netty4)
Netty 除了是一個高效能的網路通訊框架之外,還對 jdk 的Future 做了擴充套件,引入 Netty 的 maven 依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.22.Final</version>
</dependency>
EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
io.netty.util.concurrent.Future<Integer> f = group.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return 100;
}
});
f.addListener(new FutureListener<Object>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Object> objectFuture) throws Exception {
System.out.println("計算結果::"+objectFuture.get());
}
});
四、由 Callback Hell 引出 Promise 模式
同樣的如果你對 ES6 有所接觸,就不會對 Promise 這個模式感到陌生,如果你對前端不熟悉,也不要緊,我們先來看看回調地獄(Callback Hell)是個什麼概念。
回撥是一種我們推崇的非同步呼叫方式,但也會遇到問題,也就是回撥的巢狀。當需要多個非同步回撥一起書寫時,就會出現下面的程式碼(以 js 為例):
asyncFunc1(opt, (...args1) => {
asyncFunc2(opt, (...args2) => {
asyncFunc3(opt, (...args3) => {
asyncFunc4(opt, (...args4) => {
// some operation
});
});
});
});
雖然在 Java 業務程式碼中很少出現回撥的多層巢狀,這樣的程式碼不易讀,巢狀太深修改也麻煩。於是 ES6 提出了 Promise 模式來解決回撥地獄的問題。可能就會有人想問:Java 中存在 Promise 模式嗎?答案是肯定的。
前面提到了 Netty 和 Guava 的擴充套件都提供了 addListener 這樣的介面,用於處理 Callback 呼叫,但其實 jdk1.8 已經提供了一種更為高階的回撥方式:CompletableFuture。首先嚐試用 CompletableFuture 來解決回撥的問題。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
TimeUnit.SECONDS.sleep(5);
return 100;
});
completableFuture.whenComplete((result, e) -> {
System.out.println("結果:" + result);
});
Thread.currentThread().join();
五、Netty 中的 Promise 模式
Netty 文件說明 Netty 的網路操作都是非同步的, 在原始碼上大量使用了 Future/Promise 模型,在 Netty 裡面也是這樣定義的:
- Future 介面定義了 isSuccess(), isCancellable(), cause() 這些判斷非同步執行狀態的方法。(read-only)
- Promise 介面在 extends future 的基礎上增加了 setSuccess(), setFailure() 這些方法。(writable)
public interface Future<V> {
// 取消非同步操作
boolean cancel(boolean mayInterruptIfRunning);
// 非同步操作是否取消
boolean isCancelled();
// 非同步操作是否完成,正常終止、異常、取消都是完成
boolean isDone();
// 阻塞直到取得非同步操作結果
V get() throws InterruptedException, ExecutionException;
// 同上,但最長阻塞時間為timeout
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Netty 對 JDK 的 Future 進行了擴充套件
public interface Future<V> extends java.util.concurrent.Future<V> {
// 非同步操作完成且正常終止
boolean isSuccess();
// 非同步操作是否可以取消
boolean isCancellable();
// 非同步操作失敗的原因
Throwable cause();
// 新增一個監聽者,非同步操作完成時回撥,類比javascript的回撥函式
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到非同步操作完成
Future<V> await() throws InterruptedException;
// 同上,但非同步操作失敗時丟擲異常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回非同步結果,如果尚未完成返回null
V getNow();
}
Netty 的 Promise 對又對 Future 進行了擴充套件
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();
}
DefaultChannelPromise 是 ChannelPromise 的實現類,它是實際執行時的 Promoise 例項。
參考:
- 《併發程式設計 Promise, Future 和 Callback》:https://ifeve.com/promise-future-callback/
每天用心記錄一點點。內容也許不重要,但習慣很重要!