Netty中的ChannelFuture和ChannelPromise
在Netty使用ChannelFuture和ChannelPromise進行非同步操作的處理
這是官方給出的ChannelFutur描述
1 * | Completed successfully | 2 * +---------------------------+ 3 * +----> isDone() = true | 4 * +--------------------------+ | | isSuccess() = true | 5 * | Uncompleted | | +===========================+ 6 * +--------------------------+ | | Completed with failure | 7 * | isDone() = false | | +---------------------------+ 8 * | isSuccess() = false |----+----> isDone() = true | 9 * | isCancelled() = false | | | cause() = non-null | 10 * | cause() = null | | +===========================+ 11 * +--------------------------+ | | Completed by cancellation | 12 * | +---------------------------+ 13 * +----> isDone() = true | 14 * | isCancelled() = true | 15 * +---------------------------+
由圖可以知道ChannelFutur有四種狀態:Uncompleted、Completed successfully、Completed with failure、Completed by cancellation,這幾種狀態是由isDone、isSuccess、isCancelled、cause這四種方法的返回值決定的。
ChannelFutur介面的定義如下:
1 public interface ChannelFuture extends Future<Void> { 2 Channel channel(); 3 4 ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); 5 6 ChannelFuture addListeners(GenericFutureListener... var1); 7 8 ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1); 9 10 ChannelFuture removeListeners(GenericFutureListener... var1); 11 12 ChannelFuture sync() throws InterruptedException; 13 14 ChannelFuture syncUninterruptibly(); 15 16 ChannelFuture await() throws InterruptedException; 17 18 ChannelFuture awaitUninterruptibly(); 19 20 boolean isVoid(); 21 }
繼承自Netty的Future:
1 public interface Future<V> extends java.util.concurrent.Future<V> { 2 boolean isSuccess(); 3 4 boolean isCancellable(); 5 6 Throwable cause(); 7 8 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1); 9 10 Future<V> addListeners(GenericFutureListener... var1); 11 12 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1); 13 14 Future<V> removeListeners(GenericFutureListener... var1); 15 16 Future<V> sync() throws InterruptedException; 17 18 Future<V> syncUninterruptibly(); 19 20 Future<V> await() throws InterruptedException; 21 22 Future<V> awaitUninterruptibly(); 23 24 boolean await(long var1, TimeUnit var3) throws InterruptedException; 25 26 boolean await(long var1) throws InterruptedException; 27 28 boolean awaitUninterruptibly(long var1, TimeUnit var3); 29 30 boolean awaitUninterruptibly(long var1); 31 32 V getNow(); 33 34 boolean cancel(boolean var1); 35 }
Netty的Future又繼承自JDK的Future:
1 public interface Future<V> { 2 3 boolean cancel(boolean mayInterruptIfRunning); 4 5 boolean isCancelled(); 6 7 boolean isDone(); 8 9 V get() throws InterruptedException, ExecutionException; 10 11 V get(long timeout, TimeUnit unit) 12 throws InterruptedException, ExecutionException, TimeoutException; 13 }
ChannelPromise繼承了ChannelFuture:
1 public interface ChannelPromise extends ChannelFuture, Promise<Void> { 2 Channel channel(); 3 4 ChannelPromise setSuccess(Void var1); 5 6 ChannelPromise setSuccess(); 7 8 boolean trySuccess(); 9 10 ChannelPromise setFailure(Throwable var1); 11 12 ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> var1); 13 14 ChannelPromise addListeners(GenericFutureListener... var1); 15 16 ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> var1); 17 18 ChannelPromise removeListeners(GenericFutureListener... var1); 19 20 ChannelPromise sync() throws InterruptedException; 21 22 ChannelPromise syncUninterruptibly(); 23 24 ChannelPromise await() throws InterruptedException; 25 26 ChannelPromise awaitUninterruptibly(); 27 28 ChannelPromise unvoid(); 29 }
其中Promise介面定義如下:
1 public interface Promise<V> extends Future<V> { 2 Promise<V> setSuccess(V var1); 3 4 boolean trySuccess(V var1); 5 6 Promise<V> setFailure(Throwable var1); 7 8 boolean tryFailure(Throwable var1); 9 10 boolean setUncancellable(); 11 12 Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1); 13 14 Promise<V> addListeners(GenericFutureListener... var1); 15 16 Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1); 17 18 Promise<V> removeListeners(GenericFutureListener... var1); 19 20 Promise<V> await() throws InterruptedException; 21 22 Promise<V> awaitUninterruptibly(); 23 24 Promise<V> sync() throws InterruptedException; 25 26 Promise<V> syncUninterruptibly(); 27 }
在Netty中,無論是服務端還是客戶端,在Channel註冊時都會為其繫結一個ChannelPromise,預設實現是DefaultChannelPromise
DefaultChannelPromise定義如下:
1 public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint { 2 3 private final Channel channel; 4 private long checkpoint; 5 6 public DefaultChannelPromise(Channel channel) { 7 this.channel = checkNotNull(channel, "channel"); 8 } 9 10 public DefaultChannelPromise(Channel channel, EventExecutor executor) { 11 super(executor); 12 this.channel = checkNotNull(channel, "channel"); 13 } 14 15 @Override 16 protected EventExecutor executor() { 17 EventExecutor e = super.executor(); 18 if (e == null) { 19 return channel().eventLoop(); 20 } else { 21 return e; 22 } 23 } 24 25 @Override 26 public Channel channel() { 27 return channel; 28 } 29 30 @Override 31 public ChannelPromise setSuccess() { 32 return setSuccess(null); 33 } 34 35 @Override 36 public ChannelPromise setSuccess(Void result) { 37 super.setSuccess(result); 38 return this; 39 } 40 41 @Override 42 public boolean trySuccess() { 43 return trySuccess(null); 44 } 45 46 @Override 47 public ChannelPromise setFailure(Throwable cause) { 48 super.setFailure(cause); 49 return this; 50 } 51 52 @Override 53 public ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener) { 54 super.addListener(listener); 55 return this; 56 } 57 58 @Override 59 public ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { 60 super.addListeners(listeners); 61 return this; 62 } 63 64 @Override 65 public ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener) { 66 super.removeListener(listener); 67 return this; 68 } 69 70 @Override 71 public ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) { 72 super.removeListeners(listeners); 73 return this; 74 } 75 76 @Override 77 public ChannelPromise sync() throws InterruptedException { 78 super.sync(); 79 return this; 80 } 81 82 @Override 83 public ChannelPromise syncUninterruptibly() { 84 super.syncUninterruptibly(); 85 return this; 86 } 87 88 @Override 89 public ChannelPromise await() throws InterruptedException { 90 super.await(); 91 return this; 92 } 93 94 @Override 95 public ChannelPromise awaitUninterruptibly() { 96 super.awaitUninterruptibly(); 97 return this; 98 } 99 100 @Override 101 public long flushCheckpoint() { 102 return checkpoint; 103 } 104 105 @Override 106 public void flushCheckpoint(long checkpoint) { 107 this.checkpoint = checkpoint; 108 } 109 110 @Override 111 public ChannelPromise promise() { 112 return this; 113 } 114 115 @Override 116 protected void checkDeadLock() { 117 if (channel().isRegistered()) { 118 super.checkDeadLock(); 119 } 120 } 121 122 @Override 123 public ChannelPromise unvoid() { 124 return this; 125 } 126 127 @Override 128 public boolean isVoid() { 129 return false; 130 } 131 }
可以看到這個DefaultChannelPromise僅僅是將Channel封裝了,而且其基本上所有方法的實現都依賴於父類DefaultPromise
DefaultPromise中的實現是整個ChannelFuture和ChannelPromise的核心所在:
DefaultPromise中有如下幾個狀態量:
1 private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, 2 SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8)); 3 private static final Object SUCCESS = new Object(); 4 private static final Object UNCANCELLABLE = new Object(); 5 private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( 6 new CancellationException(), DefaultPromise.class, "cancel(...)")); 7 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = 8 AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
MAX_LISTENER_STACK_DEPTH: 表示最多可執行listeners的數量,預設是8
SUCCESS :表示非同步操作正常完成
UNCANCELLABLE:表示非同步操作不可取消,並且尚未完成
CANCELLATION_CAUSE_HOLDER:表示非同步操作取消監聽,用於cancel操作,
而CauseHolder 的例項物件是用來表示非同步操作異常結束,同時儲存異常資訊:
1 private static final class CauseHolder { 2 final Throwable cause; 3 CauseHolder(Throwable cause) { 4 this.cause = cause; 5 } 6 }
RESULT_UPDATER:是一個原子更新器,通過CAS操作,原子化更新 DefaultPromise物件的名為result的成員,這個result成員是其非同步操作判斷的關鍵所在
DefaultPromise的成員及構造方法定義:
1 public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> { 2 private volatile Object result; 3 private final EventExecutor executor; 4 private Object listeners; 5 private short waiters; 6 private boolean notifyingListeners; 7 8 public DefaultPromise(EventExecutor executor) { 9 this.executor = checkNotNull(executor, "executor"); 10 } 11 }
result:就是前面說的,判斷非同步操作狀態的關鍵
result的取值有:SUCCESS 、UNCANCELLABLE、CauseHolder以及null (其實還可以是泛型V型別的任意物件,這裡暫不考慮)
executor:就是Channel繫結的NioEventLoop,在我之前的部落格說過,Channel的非同步操作都是在NioEventLoop的執行緒中完成的([Netty中NioEventLoopGroup的建立原始碼分析](https://blog.csdn.net/Z_ChenChen/article/details/90567863))
listeners:通過一個Object儲存所有對非同步操作的監聽,用於非同步操作的回撥
waiters:記錄阻塞中的listeners的數量
notifyingListeners:是否需要喚醒的標誌
首先來看isDone方法,通過之前的圖可以知道,
isDone為false對應了Uncompleted狀態,即非同步操作尚未完成;
isDone為true則代表了非同步操作完成,但是還是有三種完成情況,需要結合別的判斷方法才能具體知道是哪種情況;
isDone方法:
1 @Override 2 public boolean isDone() { 3 return isDone0(result); 4 }
呼叫isDone0:
1 private static boolean isDone0(Object result) { 2 return result != null && result != UNCANCELLABLE; 3 }
有如下幾種情況:
result等於null,result沒有賦值,表示非同步操作尚未完成(從這裡就能想到非同步操作完成,需要呼叫某個set方法來改變result的狀態)
result是UNCANCELLABLE狀態,表示執行中的非同步操作不可取消,當然也就是非同步操作尚未完成
result不等於null,且不等於UNCANCELLABLE,就表示非同步操作完成(包括正常完成,以及異常結束,需要由cause方法進一步判斷)
isSuccess方法:
1 @Override 2 public boolean isSuccess() { 3 Object result = this.result; 4 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); 5 }
由這裡可以知道當且僅當result 為SUCCESS狀態時,才返回true(其餘除UNCANCELLABLE和null的值其實也可以,這裡暫不考慮)
isCancelled方法:
1 @Override 2 public boolean isCancelled() { 3 return isCancelled0(result); 4 }
呼叫isCancelled0方法:
1 private static boolean isCancelled0(Object result) { 2 return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; 3 }
只有當result是CancellationException的例項時,表示取消非同步操作
接著來看cause方法:
1 @Override 2 public Throwable cause() { 3 Object result = this.result; 4 return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null; 5 }
和上面同理,通過判別resul是否是CauseHolder的實現類,若是,將CauseHolder儲存的異常返回。
幾種狀態的判別說完了,下面看一下如何設定這幾種狀態的:
setSuccess方法:
1 @Override 2 public Promise<V> setSuccess(V result) { 3 if (setSuccess0(result)) { 4 notifyListeners(); 5 return this; 6 } 7 throw new IllegalStateException("complete already: " + this); 8 }
首先呼叫setSuccess0方法,其中result的泛型通過DefaultChannelPromise可知是Void,在DefaultChannelPromise中所有的set和try操作引數都是null,這裡的result也就不去考慮:
1 private boolean setSuccess0(V result) { 2 return setValue0(result == null ? SUCCESS : result); 3 }
繼續呼叫setValue0方法:
1 private boolean setValue0(Object objResult) { 2 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || 3 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { 4 checkNotifyWaiters(); 5 return true; 6 } 7 return false; 8 }
通過CAS操作,將result狀態變為SUCCESS
其中checkNotifyWaiters方法:
1 private synchronized void checkNotifyWaiters() { 2 if (waiters > 0) { 3 notifyAll(); 4 } 5 }
檢查waiters的個數,喚醒所有阻塞中的this,sync方法會引起阻塞
回到setSuccess方法中,setSuccess0通過CAS操作,將result狀態更新為SUCCESS成功後,呼叫
notifyListeners方法,喚醒所有listener完成對非同步操作的回撥
listeners是通過addListener方法新增的,用來對非同步操作進行偵聽:
看到addListener方法:
1 @Override 2 public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { 3 checkNotNull(listener, "listener"); 4 5 synchronized (this) { 6 addListener0(listener); 7 } 8 9 if (isDone()) { 10 notifyListeners(); 11 } 12 13 return this; 14 } 15 16 @Override 17 public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { 18 checkNotNull(listeners, "listeners"); 19 20 synchronized (this) { 21 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { 22 if (listener == null) { 23 break; 24 } 25 addListener0(listener); 26 } 27 } 28 29 if (isDone()) { 30 notifyListeners(); 31 } 32 33 return this; 34 }
其中GenericFutureListener介面定義如下:
1 public interface GenericFutureListener<F extends Future<?>> extends EventListener { 2 /** 3 * Invoked when the operation associated with the {@link Future} has been completed. 4 * 5 * @param future the source {@link Future} which called this callback 6 */ 7 void operationComplete(F future) throws Exception; 8 }
可以看到listener其實就是通過operationComplete方法,來完成回撥,對Future物件進行處理,由註釋可知operationComplete方法是在future操作完成時呼叫
addListeners方法的實現比較簡單,實現核心是在addListener0中:
1 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { 2 if (listeners == null) { 3 listeners = listener; 4 } else if (listeners instanceof DefaultFutureListeners) { 5 ((DefaultFutureListeners) listeners).add(listener); 6 } else { 7 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); 8 } 9 }
其中DefaultFutureListeners是將GenericFutureListener物件封裝的一個數組:
1 final class DefaultFutureListeners { 2 3 private GenericFutureListener<? extends Future<?>>[] listeners; 4 private int size; 5 private int progressiveSize; 6 7 @SuppressWarnings("unchecked") 8 DefaultFutureListeners( 9 GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { 10 listeners = new GenericFutureListener[2]; 11 listeners[0] = first; 12 listeners[1] = second; 13 size = 2; 14 if (first instanceof GenericProgressiveFutureListener) { 15 progressiveSize ++; 16 } 17 if (second instanceof GenericProgressiveFutureListener) { 18 progressiveSize ++; 19 } 20 } 21 22 public void add(GenericFutureListener<? extends Future<?>> l) { 23 GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; 24 final int size = this.size; 25 if (size == listeners.length) { 26 this.listeners = listeners = Arrays.copyOf(listeners, size << 1); 27 } 28 listeners[size] = l; 29 this.size = size + 1; 30 31 if (l instanceof GenericProgressiveFutureListener) { 32 progressiveSize ++; 33 } 34 } 35 36 public void remove(GenericFutureListener<? extends Future<?>> l) { 37 final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners; 38 int size = this.size; 39 for (int i = 0; i < size; i ++) { 40 if (listeners[i] == l) { 41 int listenersToMove = size - i - 1; 42 if (listenersToMove > 0) { 43 System.arraycopy(listeners, i + 1, listeners, i, listenersToMove); 44 } 45 listeners[-- size] = null; 46 this.size = size; 47 48 if (l instanceof GenericProgressiveFutureListener) { 49 progressiveSize --; 50 } 51 return; 52 } 53 } 54 } 55 56 public GenericFutureListener<? extends Future<?>>[] listeners() { 57 return listeners; 58 } 59 60 public int size() { 61 return size; 62 } 63 64 public int progressiveSize() { 65 return progressiveSize; 66 } 67 }
size:記錄listeners的個數
progressiveSize:記錄GenericProgressiveFutureListener型別的listeners的個數
DefaultFutureListeners 中對陣列的操作比較簡單,
add方法,當size達到陣列長度時,進行二倍擴容,
其中GenericProgressiveFutureListener繼承自GenericFutureListener:
1 public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> { 2 /** 3 * Invoked when the operation has progressed. 4 * 5 * @param progress the progress of the operation so far (cumulative) 6 * @param total the number that signifies the end of the operation when {@code progress} reaches at it. 7 * {@code -1} if the end of operation is unknown. 8 */ 9 void operationProgressed(F future, long progress, long total) throws Exception; 10 }
由註釋可知operationProgressed是在future操作進行時呼叫,這裡不對GenericProgressiveFutureListener過多討論
回到addListener0方法,由DefaultFutureListeners就可以知道,實際上通過DefaultFutureListeners管理的一維陣列來儲存listeners
在addListener方法完成對listener的新增後,還會呼叫isDone方法檢查當前非同步操作是否完成,若是完成需要呼叫notifyListeners,直接喚醒所有listeners完後對非同步操作的回撥
有add就有remove,removeListener方法:
1 @Override 2 public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) { 3 checkNotNull(listener, "listener"); 4 5 synchronized (this) { 6 removeListener0(listener); 7 } 8 9 return this; 10 } 11 12 @Override 13 public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) { 14 checkNotNull(listeners, "listeners"); 15 16 synchronized (this) { 17 for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { 18 if (listener == null) { 19 break; 20 } 21 removeListener0(listener); 22 } 23 } 24 25 return this; 26 }
還是由removeListener0來實現:
1 private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) { 2 if (listeners instanceof DefaultFutureListeners) { 3 ((DefaultFutureListeners) listeners).remove(listener); 4 } else if (listeners == listener) { 5 listeners = null; 6 } 7 }
看過之前的內容在看這裡就比較簡單了,通過DefaultFutureListeners去刪除
notifyListeners方法:
1 private void notifyListeners() { 2 EventExecutor executor = executor(); 3 if (executor.inEventLoop()) { 4 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); 5 final int stackDepth = threadLocals.futureListenerStackDepth(); 6 if (stackDepth < MAX_LISTENER_STACK_DEPTH) { 7 threadLocals.setFutureListenerStackDepth(stackDepth + 1); 8 try { 9 notifyListenersNow(); 10 } finally { 11 threadLocals.setFutureListenerStackDepth(stackDepth); 12 } 13 return; 14 } 15 } 16 17 safeExecute(executor, new Runnable() { 18 @Override 19 public void run() { 20 notifyListenersNow(); 21 } 22 }); 23 }
其中executor方法:
1 protected EventExecutor executor() { 2 return executor; 3 }
用來獲取executor輪詢執行緒物件
判斷executor是否處於輪詢,否則需要通過safeExecute方法處理listeners的偵聽,
safeExecute方法:
1 private static void safeExecute(EventExecutor executor, Runnable task) { 2 try { 3 executor.execute(task); 4 } catch (Throwable t) { 5 rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t); 6 } 7 }
這裡保證了listeners的偵聽回撥是非同步執行
InternalThreadLocalMap在我之前的部落格中說過,是Netty使用的ThreadLocal (Netty中FastThreadLocal原始碼分析)
去執行緒本地變數中找futureListenerStackDepth(預設為0),判斷stackDepth是否小於MAX_LISTENER_STACK_DEPTH,否則也要通過safeExecute方法處理listeners的偵聽
核心都是呼叫notifyListenersNow方法:
1 private void notifyListenersNow() { 2 Object listeners; 3 synchronized (this) { 4 // Only proceed if there are listeners to notify and we are not already notifying listeners. 5 if (notifyingListeners || this.listeners == null) { 6 return; 7 } 8 notifyingListeners = true; 9 listeners = this.listeners; 10 this.listeners = null; 11 } 12 for (;;) { 13 if (listeners instanceof DefaultFutureListeners) { 14 notifyListeners0((DefaultFutureListeners) listeners); 15 } else { 16 notifyListener0(this, (GenericFutureListener<?>) listeners); 17 } 18 synchronized (this) { 19 if (this.listeners == null) { 20 // Nothing can throw from within this method, so setting notifyingListeners back to false does not 21 // need to be in a finally block. 22 notifyingListeners = false; 23 return; 24 } 25 listeners = this.listeners; 26 this.listeners = null; 27 } 28 } 29 }
先檢查是否需要監聽,滿足條件後,判斷listeners是否是DefaultFutureListeners,即包裝後的陣列
notifyListeners0方法:
1 private void notifyListeners0(DefaultFutureListeners listeners) { 2 GenericFutureListener<?>[] a = listeners.listeners(); 3 int size = listeners.size(); 4 for (int i = 0; i < size; i ++) { 5 notifyListener0(this, a[i]); 6 } 7 }
遍歷這個陣列,實則呼叫notifyListener0方法:
1 private static void notifyListener0(Future future, GenericFutureListener l) { 2 try { 3 l.operationComplete(future); 4 } catch (Throwable t) { 5 if (logger.isWarnEnabled()) { 6 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); 7 } 8 } 9 }
這裡就可以看到,完成了對operationComplete的回撥,處理future
setSuccess結束,再來看trySuccess方法:
1 @Override 2 public boolean trySuccess(V result) { 3 if (setSuccess0(result)) { 4 notifyListeners(); 5 return true; 6 } 7 return false; 8 }
對比setSuccess來看,只有返回值不一樣
setFailure方法:
1 @Override 2 public Promise<V> setFailure(Throwable cause) { 3 if (setFailure0(cause)) { 4 notifyListeners(); 5 return this; 6 } 7 throw new IllegalStateException("complete already: " + this, cause); 8 } 9 10 private boolean setFailure0(Throwable cause) { 11 return setValue0(new CauseHolder(checkNotNull(cause, "cause"))); 12 } 13 14 private boolean setValue0(Object objResult) { 15 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || 16 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { 17 checkNotifyWaiters(); 18 return true; 19 } 20 return false; 21 }
和setSuccess邏輯一樣,只不過CAS操作將狀態變為了CauseHolder物件,成功後喚醒listeners對非同步操作的回撥
tryFailure方法:
1 @Override 2 public boolean tryFailure(Throwable cause) { 3 if (setFailure0(cause)) { 4 notifyListeners(); 5 return true; 6 } 7 return false; 8 }
也都是一個邏輯
還有一個setUncancellable方法:
1 @Override 2 public boolean setUncancellable() { 3 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { 4 return true; 5 } 6 Object result = this.result; 7 return !isDone0(result) || !isCancelled0(result); 8 }
若是result狀態為null,非同步操作尚未結束,直接通過CAS操作將狀態變為UNCANCELLABLE
否則若是根據狀態來判斷
下來看到cancel方法:
1 /** 2 * {@inheritDoc} 3 * 4 * @param mayInterruptIfRunning this value has no effect in this implementation. 5 */ 6 @Override 7 public boolean cancel(boolean mayInterruptIfRunning) { 8 if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { 9 checkNotifyWaiters(); 10 notifyListeners(); 11 return true; 12 } 13 return false; 14 }
mayInterruptIfRunning正如註釋中所說,在這裡沒有什麼作用
還是通過CAS操作,將狀態變為CANCELLATION_CAUSE_HOLDER,呼叫checkNotifyWaiters喚醒因sync阻塞的執行緒,notifyListeners方法回撥listeners的偵聽
最後看到sync方法:
1 @Override 2 public Promise<V> sync() throws InterruptedException { 3 await(); 4 rethrowIfFailed(); 5 return this; 6 }
先呼叫await方法:
1 @Override 2 public Promise<V> await() throws InterruptedException { 3 if (isDone()) { 4 return this; 5 } 6 7 if (Thread.interrupted()) { 8 throw new InterruptedException(toString()); 9 } 10 11 checkDeadLock(); 12 13 synchronized (this) { 14 while (!isDone()) { 15 incWaiters(); 16 try { 17 wait(); 18 } finally { 19 decWaiters(); 20 } 21 } 22 } 23 return this; 24 }
先判斷能否執行(非同步操作尚未結束,當前執行緒沒有被中斷),然後呼叫checkDeadLock方法:
1 protected void checkDeadLock() { 2 EventExecutor e = executor(); 3 if (e != null && e.inEventLoop()) { 4 throw new BlockingOperationException(toString()); 5 } 6 }
檢查輪詢執行緒是否在工作
在synchronized塊中以自身為鎖,自旋等待非同步操作的完成,若是沒完成,呼叫incWaiters方法:
1 private void incWaiters() { 2 if (waiters == Short.MAX_VALUE) { 3 throw new IllegalStateException("too many waiters: " + this); 4 } 5 ++waiters; 6 }
在小於Short.MAX_VALUE的情況下,對waiters自增,
然後使用wait將自身阻塞,等待被喚醒
所以在之前setValue0時,checkNotifyWaiters操作會notifyAll,
由此可以知道sync方法的作用:在某一執行緒中呼叫sync方法會使得當前執行緒被阻塞,只有當非同步操作執完畢,通過上面的set方法改變狀態後,才會呼叫checkNotifyWaiters方法喚醒當前執行緒。
當從阻塞中被喚醒後呼叫decWaiters方法:
1 private void decWaiters() { 2 --waiters; 3 }
使得waiters自減
通過這樣一種自旋方式,一直等到isDone成立,結束自旋,進而結束await方法,然後呼叫rethrowIfFailed方法:
1 private void rethrowIfFailed() { 2 Throwable cause = cause(); 3 if (cause == null) { 4 return; 5 } 6 7 PlatformDependent.throwException(cause); 8 }
根據非同步操作是否有異常,進而使用PlatformDependent丟擲異常。
至此Netty中的ChannelFuture和ChannelPromise分析到此全部結