Netty 中的非同步程式設計 Future 和 Promise
Netty 中大量 I/O 操作都是非同步執行,本篇博文來聊聊 Netty 中的非同步程式設計。
Java Future 提供的非同步模型
JDK 5 引入了 Future 模式。Future 介面是 Java 多執行緒 Future 模式的實現,在 java.util.concurrent
包中,可以來進行非同步計算。
對於非同步程式設計,我們想要的實現是:提交一個任務,在任務執行期間提交者可以做別的事情,這個任務是在非同步執行的,當任務執行完畢通知提交者任務完成獲取結果。
那麼在 Future 中是怎麼實現的呢?我們先看介面定義:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
我們看一個示例:
public class FutureTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); System.out.println("start"); Future<Integer> submit = executorService.submit(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 1; }); Integer value = null; try { value = submit.get(); } catch (Exception e) { e.printStackTrace(); } System.out.println(value); System.out.println("end"); } }
Futrue 的使用方式是:投遞一個任務到 Future 中執行,操作完之後呼叫 Future#get()
或者 Future#isDone()
方法判斷是否執行完畢。從這個邏輯上看, Future 提供的功能是:使用者執行緒需要主動輪詢 Future 執行緒是否完成當前任務,如果不通過輪詢是否完成而是同步等待獲取則會阻塞直到執行完畢為止。所以從這裡看,Future並不是真正的非同步,因為它少了一個回撥,充其量只能算是一個同步非阻塞模式。
executorService.submit()
方法獲取帶返回值的 Future 結果有兩種方式:
- 一種是通過實現
Callable
介面; - 第二種是中間變數返回。繼承 Future 的子類: FutureTask,通過 FutureTask 返回非同步結果而不是在主執行緒中獲取(FutureTask 本質也是使用
Callable
上面兩種方式的程式碼就變為這樣:
public class FutureTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("start");
//方式1 通過 executorService 提交一個非同步執行緒
//Future<Integer> submit = executorService.submit(new NewCallableTask());
//方式2 通過 FutureTask 包裝非同步執行緒的返回,返回結果在 FutureTask 中獲取而不是 在提交執行緒中
FutureTask<Integer> task = new FutureTask<>(new NewCallableTask());
executorService.submit(task);
//-------------方式2--------------
Integer value = null;
try {
value = task.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(value);
System.out.println("end");
}
/**
* 通過實現 Callable 介面
*/
static class NewCallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
}
}
一般在使用執行緒池建立執行緒執行任務的時候會有兩種方式,要麼實現 Runnable 介面,要麼實現 Callable 介面,它們的區別在於:
- Callable 可以在任務結束的時候提供一個返回值,Runnable 無法提供這個功能;
- Callable 的 call 方法分可以丟擲異常,而 Runnable 的 run 方法不能丟擲異常。
而我們的非同步返回自然是使用 Callable 方式。那麼 Callable 是如何實現的呢?
從 Callable 被提交的地方入手:executorService.submit(task)
, ExecutorService 是一個介面,他的預設實現類是:AbstractExecutorService,我們看這裡的 submit()
實現方式:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看到將 Callable 又包裝成了 RunnableFuture。而這個 RunnableFuture
就比較神奇,它同時繼承了 Runnable 和 Future ,既有執行緒的能力又有可攜帶返回值的功能。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
所以再看 submit()
方法,其實是將 RunnableFuture 執行緒送入執行緒池執行,執行是一個新執行緒,只是這個執行的物件提供了 get()
方法來獲取執行結果。
那麼 Callable 優勢如何變為 RunnableFuture 的呢?我們看 newTaskFor(task)
方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
將 Callable 包裝為 FutureTask 物件,看到這裡又關聯到 FutureTask , :
public class FutureTask<V> implements RunnableFuture<V> {
}
可以看到 FutureTask 是 RunnableFuture 的子類,這也就解釋了上面的示例為什麼線上程池中可以提交 FutureTask 例項。
更詳細的執行過程這裡就不再分析,重點剖析 Future 的實現過程,它並不是真正的非同步,沒有實現回撥。所以在Java8 中又新增了一個真正的非同步函式:CompletableFuture。
CompletableFuture 非阻塞非同步程式設計模型
Java 8 中新增加了一個類:CompletableFuture,它提供了非常強大的 Future 的擴充套件功能,最重要的是實現了回撥的功能。
使用示例:
public class CallableFutureTest {
public static void main(String[] args) {
System.out.println("start");
/**
* 非同步非阻塞
*/
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("sleep done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done");
}
}
CompletableFuture.runAsync()
方法提供了非同步執行無返回值任務的功能。
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// do something
return "result";
}, executorService);
CompletableFuture.supplyAsync()
方法提供了非同步執行有返回值任務的功能。
CompletableFuture原始碼中有四個靜態方法用來執行非同步任務:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}
前面兩個可以看到是帶返回值的方法,後面兩個是不帶返回值的方法。同時支援傳入自定義的執行緒池,如果不傳入執行緒池的話預設是使用 ForkJoinPool.commonPool()
作為它的執行緒池執行非同步程式碼。
合併兩個非同步任務
如果有兩個任務需要非同步執行,且後面需要對這兩個任務的結果進行合併處理,CompletableFuture 也支援這種處理:
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Task2";
}, executorService);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (task1, task2) -> {
return task1 + task2; // return "Task1Task2" String
});
通過 CompletableFuture.thenCombineAsync()
方法獲取兩個任務的結果然後進行相應的操作。
下一個依賴上一個的結果
如果第二個任務依賴第一個任務的結果:
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future = future1.thenComposeAsync(task1 -> {
return CompletableFuture.supplyAsync(() -> {
return task1 + "Task2"; // return "Task1Task2" String
});
}, executorService);
CompletableFuture.thenComposeAsync()
支援將第一個任務的結果傳入第二個任務中。
常用 API 介紹
- 拿到上一個任務的結果做後續操作,上一個任務完成後的動作
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面四個方法表示在當前階段任務完成之後下一步要做什麼。whenComplete 表示在當前執行緒內繼續做下一步,帶 Async 字尾的表示使用新執行緒去執行。
-
拿到上一個任務的結果做後續操作,使用 handler 來處理邏輯,可以返回與第一階段處理的返回型別不一樣的返回型別。
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
Handler 與 whenComplete 的區別是 handler 是可以返回一個新的 CompletableFuture 型別的。
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { return "hahaha"; }).handle((r, e) -> { return 1; });
-
拿到上一個任務的結果做後續操作, thenApply方法
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
注意到 thenApply 方法的引數中是沒有 Throwable,這就意味著如有有異常就會立即失敗,不能在處理邏輯內處理。且 thenApply 返回的也是新的 CompletableFuture。 這就是它與前面兩個的區別。
-
拿到上一個任務的結果做後續操作,可以不返回任何值,thenAccept方法
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
看這裡的示例:
CompletableFuture.supplyAsync(() -> { return "result"; }).thenAccept(r -> { System.out.println(r); }).thenAccept(r -> { System.out.println(r); });
執行完畢是不會返回任何值的。
CompletableFuture 的特性提現在執行完 runAsync 或者 supplyAsync 之後的操作上。CompletableFuture 能夠將回調放到與任務不同的執行緒中執行,也能將回調作為繼續執行的同步函式,在與任務相同的執行緒中執行。它避免了傳統回撥最大的問題,那就是能夠將控制流分離到不同的事件處理器中。
另外當你依賴 CompletableFuture 的計算結果才能進行下一步的時候,無需手動判斷當前計算是否完成,可以通過 CompletableFuture 的事件監聽自動去完成。
Netty 中的非同步程式設計
說 Netty 中的非同步程式設計之前先說一個非同步程式設計模型:Future/Promise非同步模型。
future和promise起源於函數語言程式設計和相關範例(如邏輯程式設計 ),目的是將值(future)與其計算方式(promise)分離,從而允許更靈活地進行計算,特別是通過並行化。
Future 表示目標計算的返回值,Promise 表示計算的方式,這個模型將返回結果和計算邏輯分離,目的是為了讓計算邏輯不影響返回結果,從而抽象出一套非同步程式設計模型。那計算邏輯如何與結果關聯呢?它們之間的紐帶就是 callback。
引用自:https://zh.wikipedia.org/wiki/Future%E4%B8%8Epromise
在 Netty 中的非同步程式設計就是基於該模型來實現。Netty 中非常多的非同步呼叫,最簡單的例子就是我們 Server 和 Client 端啟動的例子:
Server:
Client:
Netty 中使用了一個 ChannelFuture 來實現非同步操作,看似與 Java 中的 Future 相似,我們看一下程式碼:
public interface ChannelFuture extends Future<Void> {
}
這裡 ChannelFuture 繼承了一個 Future,這是 Java 中的 Future 嗎?跟下去發現並不是 JDK 的,而是 Netty 自己實現的。該類位於:io.netty.util.concurrent
包中:
public interface Future<V> extends java.util.concurrent.Future<V> {
// 只有IO操作完成時才返回true
boolean isSuccess();
// 只有當cancel(boolean)成功取消時才返回true
boolean isCancellable();
// IO操作發生異常時,返回導致IO操作以此的原因,如果沒有異常,返回null
Throwable cause();
// 向Future新增事件,future完成時,會執行這些事件,如果add時future已經完成,會立即執行監聽事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除監聽事件,future完成時,不會觸發
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待future done
Future<V> sync() throws InterruptedException;
// 等待future done,不可打斷
Future<V> syncUninterruptibly();
// 等待future完成
Future<V> await() throws InterruptedException;
// 等待future 完成,不可打斷
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 立刻獲得結果,如果沒有完成,返回null
V getNow();
// 如果成功取消,future會失敗,導致CancellationException
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
Netty 自己實現的 Future 繼承了 JDK 的 Future,新增了 sync()
和 await()
用於阻塞等待,還加了 Listeners,只要任務結束去回撥 Listener 就可以了,那麼我們就不一定要主動呼叫 isDone()
來獲取狀態,或通過 get()
阻塞方法來獲取值。
Netty的 Future 與 Java 的 Future 雖然類名相同,但功能上略有不同,Netty 中引入了 Promise 機制。在 Java 的 Future 中,業務邏輯為一個 Callable 或 Runnable 實現類,該類的 call()
或 run()
執行完畢意味著業務邏輯的完結,在 Promise 機制中,可以在業務邏輯中人工設定業務邏輯的成功與失敗,這樣更加方便的監控自己的業務邏輯。
public interface Promise<V> extends Future<V> {
// 設定future執行結果為成功
Promise<V> setSuccess(V result);
// 嘗試設定future執行結果為成功,返回是否設定成功
boolean trySuccess(V result);
// 設定失敗
Promise<V> setFailure(Throwable cause);
// 嘗試設定future執行結果為失敗,返回是否設定成功
boolean tryFailure(Throwable cause);
// 設定為不能取消
boolean setUncancellable();
// 原始碼中,以下為覆蓋了Future的方法,例如;
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}
Promise 介面繼承自 Future 介面,重點添加了上述幾個方法,可以人工設定 future 的執行成功與失敗,並通知所有監聽的 listener。
從 Future 和 Promise 提供的方法來看,Future 都是 get 型別的方法,主要用來判斷當前任務的狀態。而 Promise 中是 set 型別的方法,主要來對任務的狀態來進行操作。這裡就體現出來將 結果和操作過程分離的設計。
Promise 實現類是DefaultPromise類,該類十分重要,Future 的 listener 機制也是由它實現的,所以我們先來分析一下該類。先來看一下它的重要屬性:
// 可以巢狀的Listener的最大層數,可見最大值為8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result欄位由使用RESULT_UPDATER更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
// 非同步操作不可取消
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
// 非同步操作失敗時儲存異常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
第一個套 listener,是指在 listener 的 operationComplete()
方法中,可以再次使用 future.addListener()
繼續新增 listener,Netty 限制的最大層數是8,使用者可使用系統變數io.netty.defaultPromise.maxListenerStackDepth
設定。
為了更好的說明,先寫了一個示例,Netty 中的 Future/Promise模型是可以單獨拿出來使用的。
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
/**
* @author rickiyang
* @date 2020-04-19
* @Desc TODO
*/
public class PromiseTest {
public static void main(String[] args) {
PromiseTest testPromise = new PromiseTest();
Promise<String> promise = testPromise.doSomething("哈哈");
promise.addListener(future -> System.out.println(promise.get()+", something is done"));
}
/**
* 建立一個DefaultPromise並返回,將業務邏輯放入執行緒池中執行
* @param value
* @return
*/
private Promise<String> doSomething(String value) {
NioEventLoopGroup loop = new NioEventLoopGroup();
DefaultPromise<String> promise = new DefaultPromise<>(loop.next());
loop.schedule(() -> {
try {
Thread.sleep(1000);
promise.setSuccess("執行成功。" + value);
return promise;
} catch (InterruptedException ignored) {
promise.setFailure(ignored);
}
return promise;
}, 0, TimeUnit.SECONDS);
return promise;
}
}
通過這個例子可以看到,Promise 能夠在業務邏輯執行緒中通知 Future 成功或失敗,由於 Promise 繼承了 Netty 的 Future,因此可以加入監聽事件。而 Future 和 Promise 的好處在於,獲取到 Promise 物件後可以為其設定非同步呼叫完成後的操作,然後立即繼續去做其他任務。
來看一下 addListener() 方法:
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//併發控制,保證多執行緒情況下只有一個執行緒執行新增操作
synchronized (this) {
addListener0(listener);
}
// 操作完成,通知監聽者
if (isDone()) {
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 如果當前Promise例項持有listeners的是DefaultFutureListeners型別,則呼叫它的add()方法進行新增
((DefaultFutureListeners) listeners).add(listener);
} else {
// 步入這裡說明當前Promise例項持有listeners為單個GenericFutureListener例項,需要轉換為DefaultFutureListeners例項
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
這裡看到有一個全域性變數 listeners
,我們看到他的定義:
private Object listeners;
為啥會是一個 Object 型別的物件呢,不是應該是 List 或者是陣列才對嘛。Netty之所以這樣設計,是因為大多數情況下 listener 只有一個,用集合和陣列都會造成浪費。當只有一個 listener 時,該欄位為一個 GenericFutureListener 物件;當多於一個 listener 時,該欄位為 DefaultFutureListeners ,可以儲存多個 listener。
我們再來看 notifyListeners()
方法:
private void notifyListeners() {
EventExecutor executor = executor();
//當前EventLoop執行緒需要檢查listener巢狀
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
//這裡是當前listener的巢狀層數
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//外部執行緒直接提交給新執行緒執行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
這裡有個疑問就是為什麼要設定當前的呼叫棧深度+1。
接著看真正執行通知的方法:
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// 正在通知或已沒有監聽者(外部執行緒刪除)直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
//只有一個listener
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
//有多個listener
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// 執行完畢且外部執行緒沒有再新增監聽者
notifyingListeners = false;
return;
}
//外部執行緒添加了新的監聽者繼續執行
listeners = this.listeners;
this.listeners = null;
}
}
}
Netty 中 DefalutPromise 是一個非常常用的類,這是 Promise 實現的基礎。DefaultChannelPromise DefalutPromise 的子類,加入了 channel 這個屬性。
Promise 目前支援兩種型別的監聽器:
- GenericFutureListener:支援泛型的 Future ;
- GenericProgressiveFutureListener:它是
GenericFutureListener
的子類,支援進度表示和支援泛型的Future 監聽器(有些場景需要多個步驟實現,類似於進度條那樣)。
為了讓 Promise 支援多個監聽器,Netty 添加了一個預設修飾符修飾的DefaultFutureListeners
類用於儲存監聽器例項陣列:
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
// 這個構造相對特別,是為了讓Promise中的listeners(Object型別)例項由單個GenericFutureListener例項轉換為DefaultFutureListeners型別
@SuppressWarnings("unchecked")
DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
// 注意這裡,每次擴容陣列長度是原來的2倍
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
// 把當前的GenericFutureListener加入陣列中
listeners[size] = l;
// 監聽器總數量加1
this.size = size + 1;
// 如果為GenericProgressiveFutureListener,則帶進度指示的監聽器總數量加1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
// 計算需要需要移動的監聽器的下標
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
// listenersToMove後面的元素全部移動到陣列的前端
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
// 當前監聽器總量的最後一個位置設定為null,數量減1
listeners[-- size] = null;
this.size = size;
// 如果監聽器是GenericProgressiveFutureListener,則帶進度指示的監聽器總數量減1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
// 返回監聽器例項陣列
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
// 返回監聽器總數量
public int size() {
return size;
}
// 返回帶進度指示的監聽器總數量
public int progressiveSize() {
return progressiveSize;
}
}
以上就是關於 Promise 和監聽器相關的實現分析,再回到之前的啟動類,是不是還有一個 sync() 方法:
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
// 非同步操作已經完成,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死鎖檢測
checkDeadLock();
// 同步使修改waiters的執行緒只有一個
synchronized (this) {
while (!isDone()) { // 等待直到非同步操作完成
incWaiters(); // ++waiters;
try {
wait(); // JDK方法
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}
這裡其實就是一個同步檢測當前事件是否完成的過程。
以上就是 Netty 中實現的 Future/Promise 非同步回撥機制。實現並不是很難懂,程式碼很值得學習。除了 Netty 中實現了 Future/Promise模型,在Guava中也有相關的實現,大家日常使用可以看習慣引用相關的包。
Guava實現:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return 100;
}
});
Futures.addCallback(future, new FutureCallback<Integer>() {
public void onSuccess(Integer result) {
System.out.println("success:" + result);
}
public void onFailure(Throwable throwable) {
System.out.println("fail, e = " + throwable);
}
});
Thread.currentThread().join();