Netty之Future-netty學習筆記(12)-20210823
Netty內部的io.netty.util.concurrent.Future<V> 繼承自java.util.concurrent.Future<V>,而Promise<V>是前者的一個特殊實現。
(一)jdk中future和netty中future的比較
jdk中future:
// 取消非同步操作 boolean cancel(boolean mayInterruptIfRunning); // 非同步操作是否取消 boolean isCancelled(); // 非同步操作是否完成,正常終止、異常、取消都是完成 boolean isDone(); // 阻塞直到取得非同步操作結果V get() throws InterruptedException, ExecutionException; // 同上,但最長阻塞時間為timeout V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException;
jdk中future的特點:
1.無論結果是成功還是失敗還是取消,返回的都是isdone();
2.而且我們在非同步操作觸發和結束的時候比較關心其他的一些操作,在jdk的future中無法進行補充。所以netty對future做了擴充套件。
netty中future(以下為擴充套件內容):
// 非同步操作完成且正常終止
boolean isSuccess();
// 非同步操作是否可以取消
boolean isCancellable();
// 非同步操作失敗的原因
Throwable cause();
// 新增一個監聽者,非同步操作完成時回撥,類比javascript的回撥函式
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 增長多個回撥方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 刪除回撥方法
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 刪除多個回撥方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 阻塞等待,且若是失敗丟擲異常
Future<V> sync() throws InterruptedException;
// 同上,區別是不可中斷阻塞等待過程
Future<V> syncUninterruptibly();
// 阻塞等待
Future<V> await() throws InterruptedException;
// 同上,區別是不可中斷阻塞等待過程
Future<V> awaitUninterruptibly();
V getNow();
netty中future的特點:
1.操作結果分為success,fail,canceled三種;
2.並且通過addlisteners()方法可以添加回調操作,即觸發或者完成時需要進行的操作;
3.await()和sync(),可以以阻塞的方式等待非同步完成;getnow()可以獲得非同步操作的結果,如果還未完成則返回Null;
綜合以上的分析,給出一張表示future的狀態圖來增強對future的理解:
註釋:future只有兩種狀態,unconpleted和conpleted.
completedfuture表示已經完成非同步操做,該類在非同步操做結束時建立,使用者使用addlistener()方法提供非同步操做方法。
(二)abstractfuture
abstractfuture類實現future介面,在其中主要實現了get()方法,以阻塞的方式來取得非同步操作的結果,其功能與jdk中future的get()方法類似。
abstractfuture原始碼:
@Override public V get() throws InterruptedException, ExecutionException { //阻塞直到非同步任務完成 await(); Throwable cause = cause(); if (cause == null) { //獲得非同步操作結果 return getNow(); } //操作失敗則丟擲異常 if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause);
}
該類中get(long timeout, TimeUnit unit) 實現方式與get()大致相同。
(三)completefuture
completedfuture表示已經完成非同步操作,該類在非同步操作結束時建立,使用者使用addlistener()方法提供非同步操作方法。
completefuture的狀態:
@Override public boolean isDone() { return true; }
completefuture表示已經完成非同步操作,所以isdone()方法返回true;並且sync()方法和await()方法會立即返回。
@Override public Future<V> sync() throws InterruptedException { return this; }
觸發操作的執行者(eventexecutor)
private final EventExecutor executor; protected CompleteFuture(EventExecutor executor) { this.executor = executor; } protected EventExecutor executor() { return executor; }
completefuture中維護了一個eventexecutor例項,用來執行listener中的任務。
觸發操作的執行過程:
第一步addlistener():
#completefuture類 public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { if (listener == null) { throw new NullPointerException("listener"); } DefaultPromise.notifyListener(executor(), this, listener); return this;
呼叫addlistener()方法後,DefaultPromise會呼叫靜態方法notifyListener(),來執行listener中的操作。
#DefaultPromise類 protected static void notifyListener( EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) { //省略 …… notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener); }
然後再看看notifyListenerWithStackOverFlowProtection()
#DefaultPromise類 private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor, final Future<?> future, final GenericFutureListener<?> listener) { if (executor.inEventLoop()) { //省略,這段程式碼表示在本執行緒中執行 …… } //在外部觸發,則將其封裝成runnable任務 safeExecute(executor, new Runnable() { @Override public void run() { notifyListener0(future, listener); } }); }
接下來的safeexecute()和notifylistener0()就很簡單了,
#DefaultPromise類 private static void safeExecute(EventExecutor executor, Runnable task) { try { //將任務新增到任務佇列中等待執行 executor.execute(task); } //省略 …… } private static void notifyListener0(Future future, GenericFutureListener l) { try { //可以清楚的看到,執行到了listener中的operationcomplete(future)方法 l.operationComplete(future); } catch (Throwable t) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } }
completefuture類總結:
1.conpletefuture中儲存了eventexecutor的資訊,用來執行listener中的任務。
2.呼叫了future的addlistener()方法後,將listener中的操作封裝成runnble任務扔到eventexecutor中的任務佇列中等待執行
3.completefuture表示已經完成非同步操作,狀態是isdone。
(四)channelfuture
這是一個繼承future的介面,顧名思義,該介面與通道操作有關,所以在channelfuture介面中,除了覆蓋future的功能外,只提供了一個channel()抽象方法。
Channel channel();
(五)completechannelfuture
completechannelfuture類實現channelfuture介面,繼承completefuture類。
abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture
非同步操作結果的獲取
尖括號中的泛型表示返回結果的型別,此處是void,表示不關心返回的結果。
@Override public Void getNow() { return null; }
getnow()方法的返回結果為null,結合前面對abstractfuture類中get()方法的分析,可以得知在completechannelfuture跟get相關的方法返回的結果都是null.
completechannelfuture的初始化
類中的channel欄位:
private final Channel channel;
類的初始化:
protected CompleteChannelFuture(Channel channel, EventExecutor executor) { super(executor); if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; }
eventexecutor的管理:
@Override protected EventExecutor executor() { EventExecutor e = super.executor(); if (e == null) { return channel().eventLoop(); } else { return e; } }
如果父類中eventexecutor不為空,則返回父類中的eventexecutor,否則返回channel中儲存的eventexecutor。
completechannelfuture中的addlistener()和await(),sync()等方法的特點和completefuture類相似。
(六)Succeededchannelfuture/FailedChannelFuture
這兩個類繼承自completechannelfuture,一個表示的狀態是success,另一個表示操作失敗。
#FailedChannelFuture @Override public Throwable cause() { return cause; } #Succeededchannelfuture @Override public boolean isSuccess() { return true; }
參考:
https://segmentfault.com/a/1190000012979865
https://blog.csdn.net/moneywenxue/article/details/116905669