1. 程式人生 > >Netty中的ChannelFuture和ChannelPromise

Netty中的ChannelFuture和ChannelPromise

在Netty使用ChannelFuture和ChannelPromise進行非同步操作的處理

這是官方給出的ChannelFutur描述

 1  *                                      | Completed successfully    |
 2  *                                      +---------------------------+
 3  *                                 +---->      isDone() = true      |
 4  * +--------------------------+    |    |   isSuccess() = true      |
 5  * |        Uncompleted       |    |    +===========================+
 6  * +--------------------------+    |    | Completed with failure    |
 7  * |      isDone() = false    |    |    +---------------------------+
 8  * |   isSuccess() = false    |----+---->      isDone() = true      |
 9  * | isCancelled() = false    |    |    |       cause() = non-null  |
10  * |       cause() = null     |    |    +===========================+
11  * +--------------------------+    |    | Completed by cancellation |
12  *                                 |    +---------------------------+
13  *                                 +---->      isDone() = true      |
14  *                                      | isCancelled() = true      |
15  *                                      +---------------------------+

由圖可以知道ChannelFutur有四種狀態:Uncompleted、Completed successfully、Completed with failure、Completed by cancellation,這幾種狀態是由isDone、isSuccess、isCancelled、cause這四種方法的返回值決定的。

 

ChannelFutur介面的定義如下:

 1 public interface ChannelFuture extends Future<Void> {
 2     Channel channel();
 3 
 4     ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
 5 
 6     ChannelFuture addListeners(GenericFutureListener... var1);
 7 
 8     ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
 9 
10     ChannelFuture removeListeners(GenericFutureListener... var1);
11 
12     ChannelFuture sync() throws InterruptedException;
13 
14     ChannelFuture syncUninterruptibly();
15 
16     ChannelFuture await() throws InterruptedException;
17 
18     ChannelFuture awaitUninterruptibly();
19 
20     boolean isVoid();
21 }

繼承自Netty的Future:

 1 public interface Future<V> extends java.util.concurrent.Future<V> {
 2     boolean isSuccess();
 3 
 4     boolean isCancellable();
 5 
 6     Throwable cause();
 7 
 8     Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
 9 
10     Future<V> addListeners(GenericFutureListener... var1);
11 
12     Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
13 
14     Future<V> removeListeners(GenericFutureListener... var1);
15 
16     Future<V> sync() throws InterruptedException;
17 
18     Future<V> syncUninterruptibly();
19 
20     Future<V> await() throws InterruptedException;
21 
22     Future<V> awaitUninterruptibly();
23 
24     boolean await(long var1, TimeUnit var3) throws InterruptedException;
25 
26     boolean await(long var1) throws InterruptedException;
27 
28     boolean awaitUninterruptibly(long var1, TimeUnit var3);
29 
30     boolean awaitUninterruptibly(long var1);
31 
32     V getNow();
33 
34     boolean cancel(boolean var1);
35 }

 

Netty的Future又繼承自JDK的Future:

 1 public interface Future<V> {
 2 
 3     boolean cancel(boolean mayInterruptIfRunning);
 4     
 5     boolean isCancelled();
 6 
 7     boolean isDone();
 8     
 9     V get() throws InterruptedException, ExecutionException;
10 
11     V get(long timeout, TimeUnit unit)
12         throws InterruptedException, ExecutionException, TimeoutException;
13 }


ChannelPromise繼承了ChannelFuture:

 1 public interface ChannelPromise extends ChannelFuture, Promise<Void> {
 2     Channel channel();
 3 
 4     ChannelPromise setSuccess(Void var1);
 5 
 6     ChannelPromise setSuccess();
 7 
 8     boolean trySuccess();
 9 
10     ChannelPromise setFailure(Throwable var1);
11 
12     ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1);
13 
14     ChannelPromise addListeners(GenericFutureListener... var1);
15 
16     ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
17 
18     ChannelPromise removeListeners(GenericFutureListener... var1);
19 
20     ChannelPromise sync() throws InterruptedException;
21 
22     ChannelPromise syncUninterruptibly();
23 
24     ChannelPromise await() throws InterruptedException;
25 
26     ChannelPromise awaitUninterruptibly();
27 
28     ChannelPromise unvoid();
29 }

其中Promise介面定義如下:

 1 public interface Promise<V> extends Future<V> {
 2     Promise<V> setSuccess(V var1);
 3 
 4     boolean trySuccess(V var1);
 5 
 6     Promise<V> setFailure(Throwable var1);
 7 
 8     boolean tryFailure(Throwable var1);
 9 
10     boolean setUncancellable();
11 
12     Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
13 
14     Promise<V> addListeners(GenericFutureListener... var1);
15 
16     Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
17 
18     Promise<V> removeListeners(GenericFutureListener... var1);
19 
20     Promise<V> await() throws InterruptedException;
21 
22     Promise<V> awaitUninterruptibly();
23 
24     Promise<V> sync() throws InterruptedException;
25 
26     Promise<V> syncUninterruptibly();
27 }


在Netty中,無論是服務端還是客戶端,在Channel註冊時都會為其繫結一個ChannelPromise,預設實現是DefaultChannelPromise

DefaultChannelPromise定義如下:

  1 public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
  2 
  3     private final Channel channel;
  4     private long checkpoint;
  5 
  6     public DefaultChannelPromise(Channel channel) {
  7         this.channel = checkNotNull(channel, "channel");
  8     }
  9     
 10     public DefaultChannelPromise(Channel channel, EventExecutor executor) {
 11         super(executor);
 12         this.channel = checkNotNull(channel, "channel");
 13     }
 14 
 15     @Override
 16     protected EventExecutor executor() {
 17         EventExecutor e = super.executor();
 18         if (e == null) {
 19             return channel().eventLoop();
 20         } else {
 21             return e;
 22         }
 23     }
 24 
 25     @Override
 26     public Channel channel() {
 27         return channel;
 28     }
 29 
 30     @Override
 31     public ChannelPromise setSuccess() {
 32         return setSuccess(null);
 33     }
 34 
 35     @Override
 36     public ChannelPromise setSuccess(Void result) {
 37         super.setSuccess(result);
 38         return this;
 39     }
 40 
 41     @Override
 42     public boolean trySuccess() {
 43         return trySuccess(null);
 44     }
 45 
 46     @Override
 47     public ChannelPromise setFailure(Throwable cause) {
 48         super.setFailure(cause);
 49         return this;
 50     }
 51 
 52     @Override
 53     public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
 54         super.addListener(listener);
 55         return this;
 56     }
 57 
 58     @Override
 59     public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
 60         super.addListeners(listeners);
 61         return this;
 62     }
 63 
 64     @Override
 65     public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
 66         super.removeListener(listener);
 67         return this;
 68     }
 69 
 70     @Override
 71     public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
 72         super.removeListeners(listeners);
 73         return this;
 74     }
 75 
 76     @Override
 77     public ChannelPromise sync() throws InterruptedException {
 78         super.sync();
 79         return this;
 80     }
 81 
 82     @Override
 83     public ChannelPromise syncUninterruptibly() {
 84         super.syncUninterruptibly();
 85         return this;
 86     }
 87 
 88     @Override
 89     public ChannelPromise await() throws InterruptedException {
 90         super.await();
 91         return this;
 92     }
 93 
 94     @Override
 95     public ChannelPromise awaitUninterruptibly() {
 96         super.awaitUninterruptibly();
 97         return this;
 98     }
 99 
100     @Override
101     public long flushCheckpoint() {
102         return checkpoint;
103     }
104 
105     @Override
106     public void flushCheckpoint(long checkpoint) {
107         this.checkpoint = checkpoint;
108     }
109 
110     @Override
111     public ChannelPromise promise() {
112         return this;
113     }
114 
115     @Override
116     protected void checkDeadLock() {
117         if (channel().isRegistered()) {
118             super.checkDeadLock();
119         }
120     }
121 
122     @Override
123     public ChannelPromise unvoid() {
124         return this;
125     }
126 
127     @Override
128     public boolean isVoid() {
129         return false;
130     }
131 }

可以看到這個DefaultChannelPromise僅僅是將Channel封裝了,而且其基本上所有方法的實現都依賴於父類DefaultPromise

DefaultPromise中的實現是整個ChannelFuture和ChannelPromise的核心所在:

DefaultPromise中有如下幾個狀態量:

1 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
2             SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
3 private static final Object SUCCESS = new Object();
4 private static final Object UNCANCELLABLE = new Object();
5 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
6             new CancellationException(), DefaultPromise.class, "cancel(...)"));
7 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
8             AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");         

MAX_LISTENER_STACK_DEPTH: 表示最多可執行listeners的數量,預設是8
SUCCESS :表示非同步操作正常完成
UNCANCELLABLE:表示非同步操作不可取消,並且尚未完成
CANCELLATION_CAUSE_HOLDER:表示非同步操作取消監聽,用於cancel操作,
而CauseHolder 的例項物件是用來表示非同步操作異常結束,同時儲存異常資訊:

1 private static final class CauseHolder {
2     final Throwable cause;
3     CauseHolder(Throwable cause) {
4         this.cause = cause;
5     }
6 }


RESULT_UPDATER:是一個原子更新器,通過CAS操作,原子化更新 DefaultPromise物件的名為result的成員,這個result成員是其非同步操作判斷的關鍵所在

DefaultPromise的成員及構造方法定義:

 1 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
 2     private volatile Object result;
 3     private final EventExecutor executor;
 4     private Object listeners;
 5     private short waiters;
 6     private boolean notifyingListeners;
 7     
 8     public DefaultPromise(EventExecutor executor) {
 9         this.executor = checkNotNull(executor, "executor");
10     }
11 }

result:就是前面說的,判斷非同步操作狀態的關鍵
result的取值有:SUCCESS 、UNCANCELLABLE、CauseHolder以及null (其實還可以是泛型V型別的任意物件,這裡暫不考慮)
executor:就是Channel繫結的NioEventLoop,在我之前的部落格說過,Channel的非同步操作都是在NioEventLoop的執行緒中完成的([Netty中NioEventLoopGroup的建立原始碼分析](https://blog.csdn.net/Z_ChenChen/article/details/90567863))
listeners:通過一個Object儲存所有對非同步操作的監聽,用於非同步操作的回撥
waiters:記錄阻塞中的listeners的數量
notifyingListeners:是否需要喚醒的標誌

首先來看isDone方法,通過之前的圖可以知道,
isDone為false對應了Uncompleted狀態,即非同步操作尚未完成;
isDone為true則代表了非同步操作完成,但是還是有三種完成情況,需要結合別的判斷方法才能具體知道是哪種情況;

isDone方法:

1 @Override
2 public boolean isDone() {
3     return isDone0(result);
4 }

呼叫isDone0:

1 private static boolean isDone0(Object result) {
2     return result != null && result != UNCANCELLABLE;
3 }

有如下幾種情況:
result等於null,result沒有賦值,表示非同步操作尚未完成(從這裡就能想到非同步操作完成,需要呼叫某個set方法來改變result的狀態)
result是UNCANCELLABLE狀態,表示執行中的非同步操作不可取消,當然也就是非同步操作尚未完成
result不等於null,且不等於UNCANCELLABLE,就表示非同步操作完成(包括正常完成,以及異常結束,需要由cause方法進一步判斷)

isSuccess方法:

1 @Override
2 public boolean isSuccess() {
3     Object result = this.result;
4     return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
5 }

由這裡可以知道當且僅當result 為SUCCESS狀態時,才返回true(其餘除UNCANCELLABLE和null的值其實也可以,這裡暫不考慮)

isCancelled方法:

1 @Override
2 public boolean isCancelled() {
3     return isCancelled0(result);
4 }

呼叫isCancelled0方法:

1 private static boolean isCancelled0(Object result) {
2     return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
3 }

只有當result是CancellationException的例項時,表示取消非同步操作

 

接著來看cause方法:

1 @Override
2 public Throwable cause() {
3     Object result = this.result;
4     return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
5 }

和上面同理,通過判別resul是否是CauseHolder的實現類,若是,將CauseHolder儲存的異常返回。

幾種狀態的判別說完了,下面看一下如何設定這幾種狀態的:
setSuccess方法:

1 @Override
2 public Promise<V> setSuccess(V result) {
3     if (setSuccess0(result)) {
4         notifyListeners();
5         return this;
6     }
7     throw new IllegalStateException("complete already: " + this);
8 }

首先呼叫setSuccess0方法,其中result的泛型通過DefaultChannelPromise可知是Void,在DefaultChannelPromise中所有的set和try操作引數都是null,這裡的result也就不去考慮:

1 private boolean setSuccess0(V result) {
2     return setValue0(result == null ? SUCCESS : result);
3 }

繼續呼叫setValue0方法:

1 private boolean setValue0(Object objResult) {
2     if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
3         RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
4         checkNotifyWaiters();
5         return true;
6     }
7     return false;
8 }

通過CAS操作,將result狀態變為SUCCESS

其中checkNotifyWaiters方法:

1 private synchronized void checkNotifyWaiters() {
2     if (waiters > 0) {
3         notifyAll();
4     }
5 }

檢查waiters的個數,喚醒所有阻塞中的this,sync方法會引起阻塞

 

回到setSuccess方法中,setSuccess0通過CAS操作,將result狀態更新為SUCCESS成功後,呼叫
notifyListeners方法,喚醒所有listener完成對非同步操作的回撥

listeners是通過addListener方法新增的,用來對非同步操作進行偵聽:
看到addListener方法:

 1 @Override
 2 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
 3     checkNotNull(listener, "listener");
 4     
 5     synchronized (this) {
 6     addListener0(listener);
 7     }
 8     
 9     if (isDone()) {
10     notifyListeners();
11     }
12     
13     return this;
14 }
15 
16 @Override
17 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
18     checkNotNull(listeners, "listeners");
19 
20     synchronized (this) {
21         for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
22             if (listener == null) {
23                 break;
24             }
25             addListener0(listener);
26         }
27     }
28 
29     if (isDone()) {
30         notifyListeners();
31     }
32 
33     return this;
34 }

其中GenericFutureListener介面定義如下:

1 public interface GenericFutureListener<F extends Future<?>> extends EventListener {
2      /**
3      * Invoked when the operation associated with the {@link Future} has been completed.
4      *
5      * @param future  the source {@link Future} which called this callback
6      */
7     void operationComplete(F future) throws Exception;
8 }

可以看到listener其實就是通過operationComplete方法,來完成回撥,對Future物件進行處理,由註釋可知operationComplete方法是在future操作完成時呼叫

addListeners方法的實現比較簡單,實現核心是在addListener0中:

1 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
2     if (listeners == null) {
3         listeners = listener;
4     } else if (listeners instanceof DefaultFutureListeners) {
5         ((DefaultFutureListeners) listeners).add(listener);
6     } else {
7         listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
8     }
9 }

其中DefaultFutureListeners是將GenericFutureListener物件封裝的一個數組:

 1 final class DefaultFutureListeners {
 2 
 3     private GenericFutureListener<? extends Future<?>>[] listeners;
 4     private int size;
 5     private int progressiveSize;
 6 
 7     @SuppressWarnings("unchecked")
 8     DefaultFutureListeners(
 9             GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
10         listeners = new GenericFutureListener[2];
11         listeners[0] = first;
12         listeners[1] = second;
13         size = 2;
14         if (first instanceof GenericProgressiveFutureListener) {
15             progressiveSize ++;
16         }
17         if (second instanceof GenericProgressiveFutureListener) {
18             progressiveSize ++;
19         }
20     }
21 
22     public void add(GenericFutureListener<? extends Future<?>> l) {
23         GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
24         final int size = this.size;
25         if (size == listeners.length) {
26             this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
27         }
28         listeners[size] = l;
29         this.size = size + 1;
30 
31         if (l instanceof GenericProgressiveFutureListener) {
32             progressiveSize ++;
33         }
34     }
35 
36     public void remove(GenericFutureListener<? extends Future<?>> l) {
37         final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
38         int size = this.size;
39         for (int i = 0; i < size; i ++) {
40             if (listeners[i] == l) {
41                 int listenersToMove = size - i - 1;
42                 if (listenersToMove > 0) {
43                     System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
44                 }
45                 listeners[-- size] = null;
46                 this.size = size;
47 
48                 if (l instanceof GenericProgressiveFutureListener) {
49                     progressiveSize --;
50                 }
51                 return;
52             }
53         }
54     }
55 
56     public GenericFutureListener<? extends Future<?>>[] listeners() {
57         return listeners;
58     }
59 
60     public int size() {
61         return size;
62     }
63 
64     public int progressiveSize() {
65         return progressiveSize;
66     }
67 }

size:記錄listeners的個數
progressiveSize:記錄GenericProgressiveFutureListener型別的listeners的個數
DefaultFutureListeners 中對陣列的操作比較簡單,
add方法,當size達到陣列長度時,進行二倍擴容,

其中GenericProgressiveFutureListener繼承自GenericFutureListener:

 1 public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
 2     /**
 3      * Invoked when the operation has progressed.
 4      *
 5      * @param progress the progress of the operation so far (cumulative)
 6      * @param total the number that signifies the end of the operation when {@code progress} reaches at it.
 7      *              {@code -1} if the end of operation is unknown.
 8      */
 9     void operationProgressed(F future, long progress, long total) throws Exception;
10 }

由註釋可知operationProgressed是在future操作進行時呼叫,這裡不對GenericProgressiveFutureListener過多討論

回到addListener0方法,由DefaultFutureListeners就可以知道,實際上通過DefaultFutureListeners管理的一維陣列來儲存listeners

在addListener方法完成對listener的新增後,還會呼叫isDone方法檢查當前非同步操作是否完成,若是完成需要呼叫notifyListeners,直接喚醒所有listeners完後對非同步操作的回撥

有add就有remove,removeListener方法:

 1 @Override
 2 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
 3     checkNotNull(listener, "listener");
 4 
 5     synchronized (this) {
 6         removeListener0(listener);
 7     }
 8 
 9     return this;
10 }
11 
12 @Override
13 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
14     checkNotNull(listeners, "listeners");
15 
16     synchronized (this) {
17         for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
18             if (listener == null) {
19                 break;
20             }
21             removeListener0(listener);
22         }
23     }
24 
25     return this;
26 }

還是由removeListener0來實現:

1 private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
2     if (listeners instanceof DefaultFutureListeners) {
3         ((DefaultFutureListeners) listeners).remove(listener);
4     } else if (listeners == listener) {
5         listeners = null;
6     }
7 }

看過之前的內容在看這裡就比較簡單了,通過DefaultFutureListeners去刪除

notifyListeners方法:

 1 private void notifyListeners() {
 2     EventExecutor executor = executor();
 3     if (executor.inEventLoop()) {
 4         final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
 5         final int stackDepth = threadLocals.futureListenerStackDepth();
 6         if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
 7             threadLocals.setFutureListenerStackDepth(stackDepth + 1);
 8             try {
 9                 notifyListenersNow();
10             } finally {
11                 threadLocals.setFutureListenerStackDepth(stackDepth);
12             }
13             return;
14         }
15     }
16 
17     safeExecute(executor, new Runnable() {
18         @Override
19         public void run() {
20             notifyListenersNow();
21         }
22     });
23 }

其中executor方法:

1 protected EventExecutor executor() {
2     return executor;
3 }

用來獲取executor輪詢執行緒物件

判斷executor是否處於輪詢,否則需要通過safeExecute方法處理listeners的偵聽,
safeExecute方法:

1 private static void safeExecute(EventExecutor executor, Runnable task) {
2     try {
3         executor.execute(task);
4     } catch (Throwable t) {
5         rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
6     }
7 }

這裡保證了listeners的偵聽回撥是非同步執行

InternalThreadLocalMap在我之前的部落格中說過,是Netty使用的ThreadLocal (Netty中FastThreadLocal原始碼分析)

去執行緒本地變數中找futureListenerStackDepth(預設為0),判斷stackDepth是否小於MAX_LISTENER_STACK_DEPTH,否則也要通過safeExecute方法處理listeners的偵聽
核心都是呼叫notifyListenersNow方法:

 1 private void notifyListenersNow() {
 2     Object listeners;
 3     synchronized (this) {
 4         // Only proceed if there are listeners to notify and we are not already notifying listeners.
 5         if (notifyingListeners || this.listeners == null) {
 6             return;
 7         }
 8         notifyingListeners = true;
 9         listeners = this.listeners;
10         this.listeners = null;
11     }
12     for (;;) {
13         if (listeners instanceof DefaultFutureListeners) {
14             notifyListeners0((DefaultFutureListeners) listeners);
15         } else {
16             notifyListener0(this, (GenericFutureListener<?>) listeners);
17         }
18         synchronized (this) {
19             if (this.listeners == null) {
20                 // Nothing can throw from within this method, so setting notifyingListeners back to false does not
21                 // need to be in a finally block.
22                 notifyingListeners = false;
23                 return;
24             }
25             listeners = this.listeners;
26             this.listeners = null;
27         }
28     }
29 }

先檢查是否需要監聽,滿足條件後,判斷listeners是否是DefaultFutureListeners,即包裝後的陣列
notifyListeners0方法:

1 private void notifyListeners0(DefaultFutureListeners listeners) {
2    GenericFutureListener<?>[] a = listeners.listeners();
3    int size = listeners.size();
4    for (int i = 0; i < size; i ++) {
5        notifyListener0(this, a[i]);
6    }
7 }

遍歷這個陣列,實則呼叫notifyListener0方法:

1 private static void notifyListener0(Future future, GenericFutureListener l) {
2     try {
3         l.operationComplete(future);
4     } catch (Throwable t) {
5         if (logger.isWarnEnabled()) {
6             logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
7         }
8     }
9 }

這裡就可以看到,完成了對operationComplete的回撥,處理future

 

setSuccess結束,再來看trySuccess方法:

1 @Override
2 public boolean trySuccess(V result) {
3     if (setSuccess0(result)) {
4         notifyListeners();
5         return true;
6     }
7     return false;
8 }

對比setSuccess來看,只有返回值不一樣

setFailure方法:

 1 @Override
 2 public Promise<V> setFailure(Throwable cause) {
 3     if (setFailure0(cause)) {
 4         notifyListeners();
 5         return this;
 6     }
 7     throw new IllegalStateException("complete already: " + this, cause);
 8 }
 9 
10 private boolean setFailure0(Throwable cause) {
11     return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
12 }
13 
14 private boolean setValue0(Object objResult) {
15     if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
16         RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
17         checkNotifyWaiters();
18         return true;
19     }
20     return false;
21 }

和setSuccess邏輯一樣,只不過CAS操作將狀態變為了CauseHolder物件,成功後喚醒listeners對非同步操作的回撥

tryFailure方法:

1 @Override
2 public boolean tryFailure(Throwable cause) {
3     if (setFailure0(cause)) {
4         notifyListeners();
5         return true;
6     }
7     return false;
8 }

也都是一個邏輯

還有一個setUncancellable方法:

1 @Override
2 public boolean setUncancellable() {
3     if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
4         return true;
5     }
6     Object result = this.result;
7     return !isDone0(result) || !isCancelled0(result);
8 }

若是result狀態為null,非同步操作尚未結束,直接通過CAS操作將狀態變為UNCANCELLABLE
否則若是根據狀態來判斷


下來看到cancel方法:

 1 /**
 2  * {@inheritDoc}
 3  *
 4  * @param mayInterruptIfRunning this value has no effect in this implementation.
 5  */
 6 @Override
 7 public boolean cancel(boolean mayInterruptIfRunning) {
 8     if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
 9         checkNotifyWaiters();
10         notifyListeners();
11         return true;
12     }
13     return false;
14 }

mayInterruptIfRunning正如註釋中所說,在這裡沒有什麼作用
還是通過CAS操作,將狀態變為CANCELLATION_CAUSE_HOLDER,呼叫checkNotifyWaiters喚醒因sync阻塞的執行緒,notifyListeners方法回撥listeners的偵聽


最後看到sync方法:

1 @Override
2 public Promise<V> sync() throws InterruptedException {
3     await();
4     rethrowIfFailed();
5     return this;
6 }

先呼叫await方法:

 1 @Override
 2 public Promise<V> await() throws InterruptedException {
 3     if (isDone()) {
 4         return this;
 5     }
 6 
 7     if (Thread.interrupted()) {
 8         throw new InterruptedException(toString());
 9     }
10 
11     checkDeadLock();
12 
13     synchronized (this) {
14         while (!isDone()) {
15             incWaiters();
16             try {
17                 wait();
18             } finally {
19                 decWaiters();
20             }
21         }
22     }
23     return this;
24 }

先判斷能否執行(非同步操作尚未結束,當前執行緒沒有被中斷),然後呼叫checkDeadLock方法:

1 protected void checkDeadLock() {
2     EventExecutor e = executor();
3     if (e != null && e.inEventLoop()) {
4         throw new BlockingOperationException(toString());
5     }
6 }

檢查輪詢執行緒是否在工作

在synchronized塊中以自身為鎖,自旋等待非同步操作的完成,若是沒完成,呼叫incWaiters方法:

1 private void incWaiters() {
2     if (waiters == Short.MAX_VALUE) {
3         throw new IllegalStateException("too many waiters: " + this);
4     }
5     ++waiters;
6 }

在小於Short.MAX_VALUE的情況下,對waiters自增,
然後使用wait將自身阻塞,等待被喚醒
所以在之前setValue0時,checkNotifyWaiters操作會notifyAll,
由此可以知道sync方法的作用:在某一執行緒中呼叫sync方法會使得當前執行緒被阻塞,只有當非同步操作執完畢,通過上面的set方法改變狀態後,才會呼叫checkNotifyWaiters方法喚醒當前執行緒。

當從阻塞中被喚醒後呼叫decWaiters方法:

1 private void decWaiters() {
2     --waiters;
3 }

使得waiters自減
通過這樣一種自旋方式,一直等到isDone成立,結束自旋,進而結束await方法,然後呼叫rethrowIfFailed方法:

1 private void rethrowIfFailed() {
2     Throwable cause = cause();
3     if (cause == null) {
4         return;
5     }
6 
7     PlatformDependent.throwException(cause);
8 }

根據非同步操作是否有異常,進而使用PlatformDependent丟擲異常。


至此Netty中的ChannelFuture和ChannelPromise分析到此全部結