1. 程式人生 > 其它 >Netty之Future-netty學習筆記(12)-20210823

Netty之Future-netty學習筆記(12)-20210823

Netty內部的io.netty.util.concurrent.Future<V> 繼承自java.util.concurrent.Future<V>,而Promise<V>是前者的一個特殊實現。

(一)jdk中future和netty中future的比較

jdk中future:

// 取消非同步操作
boolean cancel(boolean mayInterruptIfRunning);
// 非同步操作是否取消
boolean isCancelled();
// 非同步操作是否完成,正常終止、異常、取消都是完成
boolean isDone();
// 阻塞直到取得非同步操作結果
V get() throws InterruptedException, ExecutionException; // 同上,但最長阻塞時間為timeout V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,TimeoutException;

jdk中future的特點:
1.無論結果是成功還是失敗還是取消,返回的都是isdone();
2.而且我們在非同步操作觸發和結束的時候比較關心其他的一些操作,在jdk的future中無法進行補充。所以netty對future做了擴充套件。

netty中future(以下為擴充套件內容):

// 非同步操作完成且正常終止
boolean isSuccess();
// 非同步操作是否可以取消
boolean isCancellable();
// 非同步操作失敗的原因
Throwable cause();

// 新增一個監聽者,非同步操作完成時回撥,類比javascript的回撥函式
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 增長多個回撥方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 刪除回撥方法
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 刪除多個回撥方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 阻塞等待,且若是失敗丟擲異常
Future<V> sync() throws InterruptedException;
// 同上,區別是不可中斷阻塞等待過程
Future<V> syncUninterruptibly();

// 阻塞等待
Future<V> await() throws InterruptedException;
// 同上,區別是不可中斷阻塞等待過程
Future<V> awaitUninterruptibly();
V getNow();

netty中future的特點:
1.操作結果分為success,fail,canceled三種;
2.並且通過addlisteners()方法可以添加回調操作,即觸發或者完成時需要進行的操作;
3.await()和sync(),可以以阻塞的方式等待非同步完成;getnow()可以獲得非同步操作的結果,如果還未完成則返回Null;

綜合以上的分析,給出一張表示future的狀態圖來增強對future的理解:

註釋:future只有兩種狀態,unconpleted和conpleted.

completedfuture表示已經完成非同步操做,該類在非同步操做結束時建立,使用者使用addlistener()方法提供非同步操做方法。

(二)abstractfuture

abstractfuture類實現future介面,在其中主要實現了get()方法,以阻塞的方式來取得非同步操作的結果,其功能與jdk中future的get()方法類似。

abstractfuture原始碼:

@Override
public V get() throws InterruptedException, ExecutionException {
    //阻塞直到非同步任務完成
    await();
    
    Throwable cause = cause();
    if (cause == null) {
    //獲得非同步操作結果
        return getNow();
    }
    //操作失敗則丟擲異常
    if (cause instanceof CancellationException) {
        throw (CancellationException) cause;
    }
    throw new ExecutionException(cause);
}

該類中get(long timeout, TimeUnit unit) 實現方式與get()大致相同。

(三)completefuture

completedfuture表示已經完成非同步操作,該類在非同步操作結束時建立,使用者使用addlistener()方法提供非同步操作方法。

completefuture的狀態:

@Override
public boolean isDone() {
    return true;
}

completefuture表示已經完成非同步操作,所以isdone()方法返回true;並且sync()方法和await()方法會立即返回。

@Override
public Future<V> sync() throws InterruptedException {
    return this;
}

觸發操作的執行者(eventexecutor)

private final EventExecutor executor;

protected CompleteFuture(EventExecutor executor) {
    this.executor = executor;
}

protected EventExecutor executor() {
    return executor;
}

completefuture中維護了一個eventexecutor例項,用來執行listener中的任務。

觸發操作的執行過程:

第一步addlistener():

#completefuture類
 public Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listener == null) {
        throw new NullPointerException("listener");
    }
    DefaultPromise.notifyListener(executor(), this, listener);
    return this;

呼叫addlistener()方法後,DefaultPromise會呼叫靜態方法notifyListener(),來執行listener中的操作。

#DefaultPromise類
protected static void notifyListener(
        EventExecutor eventExecutor, final Future<?> future, final GenericFutureListener<?> listener) {
    //省略
    ……
    notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
}

然後再看看notifyListenerWithStackOverFlowProtection()

#DefaultPromise類
private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
                                                              final Future<?> future,
                                                              final GenericFutureListener<?> listener) {
    if (executor.inEventLoop()) {
        //省略,這段程式碼表示在本執行緒中執行
        ……
    }
    //在外部觸發,則將其封裝成runnable任務
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListener0(future, listener);
        }
    });
}

接下來的safeexecute()和notifylistener0()就很簡單了,

#DefaultPromise類
private static void safeExecute(EventExecutor executor, Runnable task) {
    try {
    //將任務新增到任務佇列中等待執行
        executor.execute(task);
    } 
    //省略
    ……
}

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
    //可以清楚的看到,執行到了listener中的operationcomplete(future)方法
        l.operationComplete(future);
    } catch (Throwable t) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    }
}

completefuture類總結:
1.conpletefuture中儲存了eventexecutor的資訊,用來執行listener中的任務。
2.呼叫了future的addlistener()方法後,將listener中的操作封裝成runnble任務扔到eventexecutor中的任務佇列中等待執行
3.completefuture表示已經完成非同步操作,狀態是isdone。

(四)channelfuture

這是一個繼承future的介面,顧名思義,該介面與通道操作有關,所以在channelfuture介面中,除了覆蓋future的功能外,只提供了一個channel()抽象方法。

Channel channel();

(五)completechannelfuture

completechannelfuture類實現channelfuture介面,繼承completefuture類。

abstract class CompleteChannelFuture extends CompleteFuture<Void> implements ChannelFuture 

非同步操作結果的獲取

尖括號中的泛型表示返回結果的型別,此處是void,表示不關心返回的結果。

@Override
public Void getNow() {
    return null;
}

getnow()方法的返回結果為null,結合前面對abstractfuture類中get()方法的分析,可以得知在completechannelfuture跟get相關的方法返回的結果都是null.

completechannelfuture的初始化

類中的channel欄位:

private final Channel channel;

類的初始化:

 protected CompleteChannelFuture(Channel channel, EventExecutor executor) {
    super(executor);
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;
}

eventexecutor的管理:

@Override
protected EventExecutor executor() {
    EventExecutor e = super.executor();
    if (e == null) {
        return channel().eventLoop();
    } else {
        return e;
    }
}

如果父類中eventexecutor不為空,則返回父類中的eventexecutor,否則返回channel中儲存的eventexecutor。

completechannelfuture中的addlistener()和await(),sync()等方法的特點和completefuture類相似。

(六)Succeededchannelfuture/FailedChannelFuture

這兩個類繼承自completechannelfuture,一個表示的狀態是success,另一個表示操作失敗。

#FailedChannelFuture
@Override
public Throwable cause() {
    return cause;
}

#Succeededchannelfuture
@Override
public boolean isSuccess() {
    return true;
}

參考:

https://segmentfault.com/a/1190000012979865

https://blog.csdn.net/moneywenxue/article/details/116905669