從原始碼上理解Netty併發工具-Promise
前提
最近一直在看Netty
相關的內容,也在編寫一個輕量級的RPC
框架來練手,途中發現了Netty
的原始碼有很多亮點,某些實現甚至可以用苛刻來形容。另外,Netty
提供的工具類也是相當優秀,可以開箱即用。這裡分析一下個人比較喜歡的領域,併發方面的一個Netty
工具模組 - Promise
。
環境版本:
Netty:4.1.44.Final
JDK1.8
Promise簡介
Promise,中文翻譯為承諾或者許諾,含義是人與人之間,一個人對另一個人所說的具有一定憧憬的話,一般是可以實現的。
io.netty.util.concurrent.Promise
在註釋中只有一句話:特殊的可寫的io.netty.util.concurrent.Future
Promise
介面是io.netty.util.concurrent.Future
的子介面)。而io.netty.util.concurrent.Future
是java.util.concurrent.Future
的擴充套件,表示一個非同步操作的結果。我們知道,JDK
併發包中的Future
是不可寫,也沒有提供可監聽的入口(沒有應用觀察者模式),而Promise
很好地彌補了這兩個問題。另一方面從繼承關係來看,DefaultPromise
是這些介面的最終實現類,所以分析原始碼的時候需要把重心放在DefaultPromise
類。一般一個模組提供的功能都由介面定義,這裡分析一下兩個介面的功能列表:
io.netty.util.concurrent.Promise
io.netty.util.concurrent.Future
先看io.netty.util.concurrent.Future
介面:
public interface Future<V> extends java.util.concurrent.Future<V> { // I/O操作是否執行成功 boolean isSuccess(); // 標記是否可以通過下面的cancel(boolean mayInterruptIfRunning)取消I/O操作 boolean isCancellable(); // 返回I/O操作的異常例項 - 如果I/O操作本身是成功的,此方法返回null Throwable cause(); // 為當前Future例項新增監聽Future操作完成的監聽器 - isDone()方法啟用之後所有監聽器例項會得到回撥 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 為當前Future移除監聽Future操作完成的監聽器 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 同步等待Future完成得到最終結果(成功)或者丟擲異常(失敗),響應中斷 Future<V> sync() throws InterruptedException; // 同步等待Future完成得到最終結果(成功)或者丟擲異常(失敗),不響應中斷 Future<V> syncUninterruptibly(); // 等待Future完成,響應中斷 Future<V> await() throws InterruptedException; // 等待Future完成,不響應中斷 Future<V> awaitUninterruptibly(); // 帶超時時限的等待Future完成,響應中斷 boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; // 帶超時時限的等待Future完成,不響應中斷 boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); // 非阻塞馬上返回Future的結果,如果Future未完成,此方法一定返回null;有些場景下如果Future成功獲取到的結果是null則需要二次檢查isDone()方法是否為true V getNow(); // 取消當前Future例項的執行,如果取消成功會丟擲CancellationException異常 @Override boolean cancel(boolean mayInterruptIfRunning); }
sync()
和await()
方法類似,只是sync()
會檢查異常執行的情況,一旦發現執行異常馬上把異常例項包裝丟擲,而await()
方法對異常無感知。
接著看io.netty.util.concurrent.Promise
介面:
public interface Promise<V> extends Future<V> {
// 標記當前Future成功,設定結果,如果設定成功,則通知所有的監聽器,如果Future已經成功或者失敗,則丟擲IllegalStateException
Promise<V> setSuccess(V result);
// 標記當前Future成功,設定結果,如果設定成功,則通知所有的監聽器並且返回true,否則返回false
boolean trySuccess(V result);
// 標記當前Future失敗,設定結果為異常例項,如果設定成功,則通知所有的監聽器,如果Future已經成功或者失敗,則丟擲IllegalStateException
Promise<V> setFailure(Throwable cause);
// 標記當前Future失敗,設定結果為異常例項,如果設定成功,則通知所有的監聽器並且返回true,否則返回false
boolean tryFailure(Throwable cause);
// 標記當前的Promise例項為不可取消,設定成功返回true,否則返回false
boolean setUncancellable();
// 下面的方法和io.netty.util.concurrent.Future中的方法基本一致,只是修改了返回型別為Promise
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
到此,Promise
介面的所有功能都分析完畢,接下來從原始碼角度詳細分析Promise
的實現。
Promise原始碼實現
Promise
的實現類為io.netty.util.concurrent.DefaultPromise
(其實DefaultPromise
還有很多子類,某些實現是為了定製特定的場景做了擴充套件),而DefaultPromise
繼承自io.netty.util.concurrent.AbstractFuture
:
public abstract class AbstractFuture<V> implements Future<V> {
// 永久阻塞等待獲取結果的方法
@Override
public V get() throws InterruptedException, ExecutionException {
// 呼叫響應中斷的永久等待方法進行阻塞
await();
// 從永久阻塞中喚醒後,先判斷Future是否執行異常
Throwable cause = cause();
if (cause == null) {
// 異常為空說明執行成功,呼叫getNow()方法返回結果
return getNow();
}
// 異常為空不為空,這裡區分特定的取消異常則轉換為CancellationException丟擲
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 非取消異常的其他所有異常都被包裝為執行異常ExecutionException丟擲
throw new ExecutionException(cause);
}
// 帶超時阻塞等待獲取結果的方法
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 呼叫響應中斷的帶超時時限等待方法進行阻塞
if (await(timeout, unit)) {
// 從帶超時時限阻塞中喚醒後,先判斷Future是否執行異常
Throwable cause = cause();
if (cause == null) {
// 異常為空說明執行成功,呼叫getNow()方法返回結果
return getNow();
}
// 異常為空不為空,這裡區分特定的取消異常則轉換為CancellationException丟擲
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 在非等待超時的前提下,非取消異常的其他所有異常都被包裝為執行異常ExecutionException丟擲
throw new ExecutionException(cause);
}
// 方法步入此處說明等待超時,則丟擲超時異常TimeoutException
throw new TimeoutException();
}
}
AbstractFuture
僅僅對get()
和get(long timeout, TimeUnit unit)
兩個方法進行了實現,其實這兩處的實現和java.util.concurrent.FutureTask
中的實現方式十分相似。
DefaultPromise
的原始碼比較多,這裡分開多個部分去閱讀,先看它的屬性和建構函式:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 正常日誌的日誌控制代碼,InternalLogger是Netty內部封裝的日誌介面
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);
// 任務拒絕執行時候的日誌控制代碼 - Promise需要作為一個任務提交到執行緒中執行,如果任務拒絕則使用此日誌控制代碼列印日誌
private static final InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
// 監聽器的最大棧深度,預設值為8,這個值是防止巢狀回撥呼叫的時候棧深度過大導致記憶體溢位,後面會舉個例子說明它的用法
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// 結果更新器,用於CAS更新結果result的值
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 用於填充result的值,當設定結果result傳入null,Promise執行成功,用這個值去表示成功的結果
private static final Object SUCCESS = new Object();
// 用於填充result的值,表示Promise不能被取消
private static final Object UNCANCELLABLE = new Object();
// CancellationException例項的持有器,用於判斷Promise取消狀態和丟擲CancellationException
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
// CANCELLATION_CAUSE_HOLDER的異常棧資訊元素陣列
private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
// 真正的結果物件,使用Object型別,最終有可能為null、真正的結果例項、SUCCESS、UNCANCELLABLE或者CANCELLATION_CAUSE_HOLDER等等
private volatile Object result;
// 事件執行器,這裡暫時不做展開,可以理解為單個排程執行緒
private final EventExecutor executor;
// 監聽器集合,可能是單個GenericFutureListener例項或者DefaultFutureListeners(監聽器集合)例項
private Object listeners;
// 等待獲取結果的執行緒數量
private short waiters;
// 標記是否正在回撥監聽器
private boolean notifyingListeners;
// 建構函式依賴於EventExecutor
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}
protected DefaultPromise() {
// only for subclasses - 這個建構函式預留給子類
executor = null;
}
// ... 省略其他程式碼 ...
// 私有靜態內部類,用於存放Throwable例項,也就是持有異常的原因例項
private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}
// 私有靜態內部類,用於覆蓋CancellationException的棧資訊為前面定義的CANCELLATION_STACK,同時覆蓋了toString()返回CancellationException的全類名
private static final class LeanCancellationException extends CancellationException {
private static final long serialVersionUID = 2794674970981187807L;
@Override
public Throwable fillInStackTrace() {
setStackTrace(CANCELLATION_STACK);
return this;
}
@Override
public String toString() {
return CancellationException.class.getName();
}
}
// ... 省略其他程式碼 ...
}
Promise
目前支援兩種型別的監聽器:
GenericFutureListener
:支援泛型的Future
監聽器。GenericProgressiveFutureListener
:它是GenericFutureListener
的子類,支援進度表示和支援泛型的Future
監聽器(有些場景需要多個步驟實現,類似於進度條那樣)。
// GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}
// GenericProgressiveFutureListener
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
void operationProgressed(F future, long progress, long total) throws Exception;
}
為了讓Promise
支援多個監聽器,Netty
添加了一個預設修飾符修飾的DefaultFutureListeners
類用於儲存監聽器例項陣列:
// DefaultFutureListeners
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
// 這個構造相對特別,是為了讓Promise中的listeners(Object型別)例項由單個GenericFutureListener例項轉換為DefaultFutureListeners型別
@SuppressWarnings("unchecked")
DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
// 注意這裡,每次擴容陣列長度是原來的2倍
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
// 把當前的GenericFutureListener加入陣列中
listeners[size] = l;
// 監聽器總數量加1
this.size = size + 1;
// 如果為GenericProgressiveFutureListener,則帶進度指示的監聽器總數量加1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
// 計算需要需要移動的監聽器的下標
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
// listenersToMove後面的元素全部移動到陣列的前端
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
// 當前監聽器總量的最後一個位置設定為null,數量減1
listeners[-- size] = null;
this.size = size;
// 如果監聽器是GenericProgressiveFutureListener,則帶進度指示的監聽器總數量減1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
// 返回監聽器例項陣列
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
// 返回監聽器總數量
public int size() {
return size;
}
// 返回帶進度指示的監聽器總數量
public int progressiveSize() {
return progressiveSize;
}
}
接下來看DefaultPromise
的剩餘方法實現,筆者覺得DefaultPromise
方法實現在程式碼順序上是有一定的藝術的。先看幾個判斷Promise
執行狀態的方法:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他程式碼 ...
@Override
public boolean setUncancellable() {
// 通過結果更新器CAS更新result為UNCANCELLABLE,期望舊值為null,更新值為UNCANCELLABLE屬性,如果成功則返回true
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
return true;
}
Object result = this.result;
// 步入這裡說明result當前值不為null,isDone0()和isCancelled0()都是終態,這裡如果命中終態就返回false
//(筆者注:其實可以這樣認為,這裡result不能為null,如果不為終態,它只能是UNCANCELLABLE屬性例項)
return !isDone0(result) || !isCancelled0(result);
}
@Override
public boolean isSuccess() {
Object result = this.result;
// 如果執行成功,則結果不為null,同時不為UNCANCELLABLE,同時不為CauseHolder型別
//(筆者注:其實可以這樣認為,Promise為成功,則result只能是一個開發者定義的例項或者SUCCESS屬性例項)
return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
}
@Override
public boolean isCancellable() {
// 是否可取消的,result為null說明Promise處於初始化狀態尚未執行,則認為可以取消
return result == null;
}
@Override
public Throwable cause() {
// 通過當前result獲取Throwable例項
return cause0(result);
}
private Throwable cause0(Object result) {
// result非CauseHolder型別,則直接返回null
if (!(result instanceof CauseHolder)) {
return null;
}
// 如果result為CANCELLATION_CAUSE_HOLDER(靜態CancellationException的持有)
if (result == CANCELLATION_CAUSE_HOLDER) {
// 則新建一個自定義LeanCancellationException例項
CancellationException ce = new LeanCancellationException();
// 如果CAS更新結果result為LeanCancellationException新例項則返回
if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
return ce;
}
// 走到這裡說明了result是非CANCELLATION_CAUSE_HOLDER的自定義CauseHolder例項
result = this.result;
}
// 兜底返回CauseHolder持有的cause
return ((CauseHolder) result).cause;
}
// 靜態方法,判斷Promise是否為取消,依據是result必須是CauseHolder型別,同時CauseHolder中的cause必須為CancellationException型別或者其子類
private static boolean isCancelled0(Object result) {
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}
// 靜態方法,判斷Promise是否完成,依據是result不為null同時不為UNCANCELLABLE屬性例項
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
// 判斷Promise例項是否取消
@Override
public boolean isCancelled() {
return isCancelled0(result);
}
// 判斷Promise例項是否完成
@Override
public boolean isDone() {
return isDone0(result);
}
// ... 省略其他程式碼 ...
}
接著看監聽器的新增和移除方法(這其中也包含了通知監聽器的邏輯):
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他程式碼 ...
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
// 入參非空校驗
checkNotNull(listener, "listener");
// 加鎖,鎖定的物件是Promise例項自身
synchronized (this) {
// 新增監聽器
addListener0(listener);
}
// 如果Promise例項已經執行完畢,則通知監聽器進行回撥
if (isDone()) {
notifyListeners();
}
return this;
}
@Override
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// 入參非空校驗
checkNotNull(listeners, "listeners");
// 加鎖,鎖定的物件是Promise例項自身
synchronized (this) {
// 遍歷入引數組新增監聽器,有空元素直接跳出
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
addListener0(listener);
}
}
// 如果Promise例項已經執行完畢,則通知監聽器進行回撥
if (isDone()) {
notifyListeners();
}
return this;
}
@Override
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
// 入參非空校驗
checkNotNull(listener, "listener");
// 加鎖,鎖定的物件是Promise例項自身
synchronized (this) {
// 移除監聽器
removeListener0(listener);
}
return this;
}
@Override
public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
// 入參非空校驗
checkNotNull(listeners, "listeners");
// 加鎖,鎖定的物件是Promise例項自身
synchronized (this) {
// 遍歷入引數組移除監聽器,有空元素直接跳出
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
removeListener0(listener);
}
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 如果Promise例項持有listeners為null,則直接設定為入參listener
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 如果當前Promise例項持有listeners的是DefaultFutureListeners型別,則呼叫它的add()方法進行新增
((DefaultFutureListeners) listeners).add(listener);
} else {
// 步入這裡說明當前Promise例項持有listeners為單個GenericFutureListener例項,需要轉換為DefaultFutureListeners例項
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 如果當前Promise例項持有listeners的是DefaultFutureListeners型別,則呼叫它的remove()方法進行移除
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
// 如果當前Promise例項持有listeners不為DefaultFutureListeners型別,也就是單個GenericFutureListener並且和傳入的listener相同,
// 則Promise例項持有listeners置為null
listeners = null;
}
}
private void notifyListeners() {
EventExecutor executor = executor();
// 當前執行執行緒是事件迴圈執行緒,那麼直接同步呼叫,簡單來說就是呼叫notifyListeners()方法的執行緒和EventExecutor是同一個執行緒
if (executor.inEventLoop()) {
// 下面的ThreadLocal和listenerStackDepth是呼叫棧深度保護相關,博文會另起一個章節專門講解這個問題,這裡可以暫時忽略
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 當前執行執行緒不是事件迴圈執行緒,則把notifyListenersNow()包裝為Runnable例項放到EventExecutor中執行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
// 使用EventExecutor進行任務執行,execute()方法丟擲的異常會使用rejectedExecutionLogger控制代碼列印
private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}
// 馬上通知所有監聽器進行回撥
private void notifyListenersNow() {
Object listeners;
// 這裡加鎖,在鎖的保護下設定notifyingListeners的值,如果多個執行緒呼叫同一個Promise例項的notifyListenersNow()方法
// 命中notifyingListeners的執行緒可以直接返回
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
// 臨時變數listeners存放瞬時的監聽器例項,方便下一步設定Promise例項的listeners為null
listeners = this.listeners;
// 重置當前Promise例項的listeners為null
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
// 多個監聽器情況下的通知
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// 單個監聽器情況下的通知
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// 這裡因為沒有異常丟擲的可能,不用在finally塊中編寫,重置notifyingListeners為false並且返回跳出迴圈
notifyingListeners = false;
return;
}
// 臨時變數listeners存放瞬時的監聽器例項,回撥操作判斷是基於臨時例項去做 - 這裡可能由另一個執行緒更新了listeners的值
listeners = this.listeners;
// 重置當前Promise例項的listeners為null,確保監聽器只會被回撥一次,下一次跳出for死迴圈
this.listeners = null;
}
}
}
// 遍歷DefaultFutureListeners中的listeners陣列,呼叫靜態方法notifyListener0()
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}
// 這個靜態方法是最終監聽器回撥的方法,也就是簡單呼叫GenericFutureListener#operationComplete()傳入的是當前的Promise例項,捕獲一切異常列印warn日誌
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
}
然後看wait()
和sync()
方法體系:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他程式碼 ...
@Override
public Promise<V> await() throws InterruptedException {
// 如果Promise執行完畢,直接返回
if (isDone()) {
return this;
}
// 如果當前執行緒中斷則直接丟擲InterruptedException
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死鎖檢測
checkDeadLock();
// 加鎖,加鎖物件是當前Promise例項
synchronized (this) {
// 這裡設定一個死迴圈,終止條件是isDone()為true
while (!isDone()) {
// 等待執行緒數加1
incWaiters();
try {
// 這裡呼叫的是Object#wait()方法進行阻塞,如果執行緒被中斷會丟擲InterruptedException
wait();
} finally {
// 解除阻塞後等待執行緒數減1
decWaiters();
}
}
}
return this;
}
@Override
public Promise<V> awaitUninterruptibly() {
// 如果Promise執行完畢,直接返回
if (isDone()) {
return this;
}
// 死鎖檢測
checkDeadLock();
boolean interrupted = false;
// 加鎖,加鎖物件是當前Promise例項
synchronized (this) {
// 這裡設定一個死迴圈,終止條件是isDone()為true
while (!isDone()) {
// 等待執行緒數加1
incWaiters();
try {
// 這裡呼叫的是Object#wait()方法進行阻塞,捕獲了InterruptedException異常,如果丟擲InterruptedException記錄執行緒的中斷狀態到interrupted
wait();
} catch (InterruptedException e) {
// Interrupted while waiting.
interrupted = true;
} finally {
// 解除阻塞後等待執行緒數減1
decWaiters();
}
}
}
// 如果執行緒被中斷跳出等待阻塞,則清除執行緒的中斷標誌位
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
// 後面的幾個帶超時時限的wait()方法都是呼叫await0()
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}
@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}
// 檢查死鎖,這裡判斷了等待執行緒是事件迴圈執行緒則直接丟擲BlockingOperationException異常
// 簡單來說就是:Promise的執行執行緒和等待結果的執行緒,不能是同一個執行緒,否則依賴會成環
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
@Override
public Promise<V> sync() throws InterruptedException {
// 同步永久阻塞等待
await();
// 阻塞等待解除,如果執行存在異常,則直接丟擲
rethrowIfFailed();
return this;
}
@Override
public Promise<V> syncUninterruptibly() {
// 同步永久阻塞等待 - 響應中斷
awaitUninterruptibly();
// 塞等待解除,如果執行存在異常,則直接丟擲
rethrowIfFailed();
return this;
}
// waiters加1,如果超過Short.MAX_VALUE則丟擲IllegalStateException
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}
// waiters減1
private void decWaiters() {
--waiters;
}
// cause不為null則丟擲
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}
private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
// 如果Promise執行完畢,直接返回
if (isDone()) {
return true;
}
// 如果超時時限小於0那麼返回isDone()的結果
if (timeoutNanos <= 0) {
return isDone();
}
// 如果允許中斷,當前執行緒的中斷標誌位為true,則丟擲InterruptedException
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死鎖檢測
checkDeadLock();
// 記錄當前的納秒時間戳
long startTime = System.nanoTime();
// 等待時間的長度 - 單位為納秒
long waitTime = timeoutNanos;
// 記錄執行緒是否被中斷
boolean interrupted = false;
try {
// 死迴圈
for (;;) {
synchronized (this) {
// 如果Promise執行完畢,直接返回true - 這一步是先驗判斷,命中了就不需要阻塞等待
if (isDone()) {
return true;
}
// 等待執行緒數加1
incWaiters();
try {
// 這裡呼叫的是帶超時時限的Object#wait()方法進行阻塞
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
// 執行緒被中斷並且外部允許中斷,那麼直接丟擲InterruptedException
if (interruptable) {
throw e;
} else {
// 否則只記錄中斷過的狀態
interrupted = true;
}
} finally {
// 解除阻塞後等待執行緒數減1
decWaiters();
}
}
// 解除阻塞後,如果Promise執行完畢,直接返回true
if (isDone()) {
return true;
} else {
// 步入這裡說明Promise尚未執行完畢,則重新計算等待時間間隔的長度數量(修正),如果大於0則進入下一輪迴圈
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
// 如果執行緒被中斷跳出等待阻塞,則清除執行緒的中斷標誌位
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
// ... 省略其他程式碼 ...
}
最後是幾個設定結果和獲取結果的方法:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// ... 省略其他程式碼 ...
@Override
public Promise<V> setSuccess(V result) {
// 設定成功結果,如果設定成功則返回當前Promise例項
if (setSuccess0(result)) {
return this;
}
// 設定失敗說明了多次設定,Promise已經執行完畢,則丟擲異常
throw new IllegalStateException("complete already: " + this);
}
@Override
public boolean trySuccess(V result) {
// 設定成功結果,返回的布林值表示成功或失敗
return setSuccess0(result);
}
@Override
public Promise<V> setFailure(Throwable cause) {
// 設定失敗結果,如果設定成功則返回當前Promise例項
if (setFailure0(cause)) {
return this;
}
// 設定失敗說明了多次設定,Promise已經執行完畢,則丟擲異常
throw new IllegalStateException("complete already: " + this, cause);
}
@Override
public boolean tryFailure(Throwable cause) {
// 設定失敗結果,返回的布林值表示成功或失敗
return setFailure0(cause);
}
@SuppressWarnings("unchecked")
@Override
public V getNow() {
// 非阻塞獲取結果,如果result是CauseHolder型別、SUCCESS屬性例項或者UNCANCELLABLE實行例項則返回null,否則返回轉換型別後的result值
// 對異常無感知,如果CauseHolder包裹了異常,此方法依然返回null
Object result = this.result;
if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
return (V) result;
}
@SuppressWarnings("unchecked")
@Override
public V get() throws InterruptedException, ExecutionException {
// 永久阻塞獲取結果
Object result = this.result;
// 如果Promise未執行完畢則進行永久阻塞等待
if (!isDone0(result)) {
await();
// 更新結果臨時變數
result = this.result;
}
// result為SUCCESS屬性例項或者UNCANCELLABLE屬性例項的時候直接返回null
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
// 如果result為CauseHolder型別,則獲取其中持有的cause屬性,也有可能為null
Throwable cause = cause0(result);
if (cause == null) {
// 執行成功的前提下轉換型別後的result值返回
return (V) result;
}
// 取消的情況,丟擲CancellationException
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 剩餘的情況一律封裝為ExecutionException異常
throw new ExecutionException(cause);
}
@SuppressWarnings("unchecked")
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 帶超時時限的阻塞獲取結果
Object result = this.result;
// 如果Promise未執行完畢則進行帶超時時限的阻塞等待
if (!isDone0(result)) {
if (!await(timeout, unit)) {
// 等待超時直接丟擲TimeoutException
throw new TimeoutException();
}
// 更新結果臨時變數
result = this.result;
}
// result為SUCCESS屬性例項或者UNCANCELLABLE屬性例項的時候直接返回null
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
// 如果result為CauseHolder型別,則獲取其中持有的cause屬性,也有可能為null
Throwable cause = cause0(result);
if (cause == null) {
// 執行成功的前提下轉換型別後的result值返回
return (V) result;
}
// 取消的情況,丟擲CancellationException
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 剩餘的情況一律封裝為ExecutionException異常
throw new ExecutionException(cause);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// CAS更新result為CANCELLATION_CAUSE_HOLDER,result的期望值必須為null
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
// 判斷是否需要進行等待執行緒的通知
if (checkNotifyWaiters()) {
// 通知監聽器進行回撥
notifyListeners();
}
return true;
}
return false;
}
private boolean setSuccess0(V result) {
// 設定執行成功的結果,如果入參result為null,則選用SUCCESS屬性,否則使用result
return setValue0(result == null ? SUCCESS : result);
}
private boolean setFailure0(Throwable cause) {
// 設定執行失敗的結果,入參是Throwable型別,封裝為CauseHolder,存放在CauseHolder例項的cause屬性
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
private boolean setValue0(Object objResult) {
// CAS更新result為入參objResult,result的期望值必須為null或者UNCANCELLABLE才能更新成功
if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 判斷是否需要進行等待執行緒的通知
if (checkNotifyWaiters()) {
// 通知監聽器進行回撥
notifyListeners();
}
return true;
}
return false;
}
// 判斷是否需要進行等待執行緒的通知 - 其實是判斷是否需要通知監聽器回撥
private synchronized boolean checkNotifyWaiters() {
// 如果等待執行緒數量大於0則呼叫Object#notifyAll()喚醒所有等待執行緒
if (waiters > 0) {
notifyAll();
}
// 如果listeners不為空(也就是存在監聽器)的時候才返回true
return listeners != null;
}
// ... 省略其他程式碼 ...
}
Promise的基本使用
要使用Netty
的Promise
模組,並不需要引入Netty
的所有依賴,這裡只需要引入netty-common
:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.44.Final</version>
</dependency>
EventExecutor
選取方面,Netty
已經準備了一個GlobalEventExecutor
用於全域性事件處理,這裡可以直接選用(當然也可以自行實現EventExecutor
或者用EventExecutor
的其他實現類):
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<String> promise = new DefaultPromise<>(executor);
這裡設計一個場景:非同步下載一個連結的資源到磁碟上,下載完成之後需要非同步通知下載完的磁碟檔案路徑,得到通知之後列印下載結果到控制檯中。
public class PromiseMain {
public static void main(String[] args) throws Exception {
String url = "http://xxx.yyy.zzz";
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<DownloadResult> promise = new DefaultPromise<>(executor);
promise.addListener(new DownloadResultListener());
Thread thread = new Thread(() -> {
try {
System.out.println("開始下載資源,url:" + url);
long start = System.currentTimeMillis();
// 模擬下載耗時
Thread.sleep(2000);
String location = "C:\\xxx\\yyy\\z.md";
long cost = System.currentTimeMillis() - start;
System.out.println(String.format("下載資源成功,url:%s,儲存到:%s,耗時:%d ms", url, location, cost));
DownloadResult result = new DownloadResult();
result.setUrl(url);
result.setFileDiskLocation(location);
result.setCost(cost);
// 通知結果
promise.setSuccess(result);
} catch (Exception ignore) {
}
}, "Download-Thread");
thread.start();
Thread.sleep(Long.MAX_VALUE);
}
@Data
private static class DownloadResult {
private String url;
private String fileDiskLocation;
private long cost;
}
private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> {
@Override
public void operationComplete(Future<DownloadResult> future) throws Exception {
if (future.isSuccess()) {
DownloadResult downloadResult = future.getNow();
System.out.println(String.format("下載完成通知,url:%s,檔案磁碟路徑:%s,耗時:%d ms", downloadResult.getUrl(),
downloadResult.getFileDiskLocation(), downloadResult.getCost()));
}
}
}
}
執行後控制檯輸出:
開始下載資源,url:http://xxx.yyy.zzz
下載資源成功,url:http://xxx.yyy.zzz,儲存到:C:\xxx\yyy\z.md,耗時:2000 ms
下載完成通知,url:http://xxx.yyy.zzz,檔案磁碟路徑:C:\xxx\yyy\z.md,耗時:2000 ms
Promise
適用的場景很多,除了非同步通知的場景也能用於同步呼叫,它在設計上比JUC
的Future
靈活很多,基於Future
擴展出很多新的特性,有需要的可以單獨引入此依賴直接使用。
Promise監聽器棧深度的問題
有些時候,由於封裝或者人為編碼異常等原因,監聽器的回撥可能出現基於多個Promise
形成的鏈(參考Issue-5302,a promise listener chain
),這樣子有可能出現遞迴呼叫深度過大而導致棧溢位,因此需要設定一個閾值,限制遞迴呼叫的最大棧深度,這個深度閾值暫且稱為棧深度保護閾值,預設值是8,可以通過系統引數io.netty.defaultPromise.maxListenerStackDepth
覆蓋設定。這裡貼出前面提到過的程式碼塊:
private void notifyListeners() {
EventExecutor executor = executor();
// 事件執行器必須是事件迴圈型別,也就是executor.inEventLoop()為true的時候才啟用遞迴棧深度保護
if (executor.inEventLoop()) {
// 獲取當前執行緒繫結的InternalThreadLocalMap例項,這裡類似於ThreadLocal
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
// 獲取當前執行緒的監聽器呼叫棧深度
final int stackDepth = threadLocals.futureListenerStackDepth();
// 監聽器呼叫棧深度如果不超過閾值MAX_LISTENER_STACK_DEPTH
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 呼叫notifyListenersNow()前先設定監聽器呼叫棧深度 + 1
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 呼叫notifyListenersNow()完畢後設置監聽器呼叫棧深度為呼叫前的數值,也就是恢復執行緒的監聽器呼叫棧深度
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 如果監聽器呼叫棧深度超過閾值MAX_LISTENER_STACK_DEPTH,則直接每次通知監聽器當成一個新的非同步任務處理
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
如果我們想模擬一個例子觸發監聽器呼叫棧深度保護,那麼只需要想辦法在同一個EventLoop
型別的執行緒中遞迴呼叫notifyListeners()
方法即可。
最典型的例子就是在上一個Promise
監聽器回撥的方法裡面觸發下一個Promise
的監聽器的setSuccess()
(簡單理解就是套娃),畫個圖理解一下:
測試程式碼:
public class PromiseListenerMain {
private static final AtomicInteger COUNTER = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
EventExecutor executor = ImmediateEventExecutor.INSTANCE;
// root
Promise<String> root = new DefaultPromise<>(executor);
Promise<String> p1 = new DefaultPromise<>(executor);
Promise<String> p2 = new DefaultPromise<>(executor);
Promise<String> p3 = new DefaultPromise<>(executor);
Promise<String> p4 = new DefaultPromise<>(executor);
Promise<String> p5 = new DefaultPromise<>(executor);
Promise<String> p6 = new DefaultPromise<>(executor);
Promise<String> p7 = new DefaultPromise<>(executor);
Promise<String> p8 = new DefaultPromise<>(executor);
Promise<String> p9 = new DefaultPromise<>(executor);
Promise<String> p10 = new DefaultPromise<>(executor);
p1.addListener(new Listener(p2));
p2.addListener(new Listener(p3));
p3.addListener(new Listener(p4));
p4.addListener(new Listener(p5));
p5.addListener(new Listener(p6));
p6.addListener(new Listener(p7));
p7.addListener(new Listener(p8));
p8.addListener(new Listener(p9));
p9.addListener(new Listener(p10));
root.addListener(new Listener(p1));
root.setSuccess("success");
Thread.sleep(Long.MAX_VALUE);
}
private static class Listener implements GenericFutureListener<Future<String>> {
private final String name;
private final Promise<String> promise;
public Listener(Promise<String> promise) {
this.name = "listener-" + COUNTER.getAndIncrement();
this.promise = promise;
}
@Override
public void operationComplete(Future<String> future) throws Exception {
System.out.println(String.format("監聽器[%s]回撥成功...", name));
if (null != promise) {
promise.setSuccess("success");
}
}
}
}
因為有safeExecute()
兜底執行,上面的所有Promise
都會回撥,這裡可以採用IDEA
的高階斷點功能,在步入斷點的地方新增額外的日誌,輸出如下:
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-9]回撥成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-0]回撥成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-1]回撥成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-2]回撥成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-3]回撥成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-4]回撥成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-5]回撥成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)執行---
監聽器[listener-6]回撥成功...
safeExecute(notifyListenersNow)執行----------
監聽器[listener-7]回撥成功...
safeExecute(notifyListenersNow)執行----------
監聽器[listener-8]回撥成功...
這裡筆者有點疑惑,如果呼叫棧深度大於8,超出的部分會包裝為Runnable
例項提交到事件執行器執行,豈不是把遞迴棧溢位的隱患變成了記憶體溢位的隱患(因為非同步任務也有可能積壓,除非拒絕任務提交,那麼具體要看EventExecutor
的實現了)?
小結
Netty
提供的Promise
工具的原始碼和使用方式都分析完了,設計理念和程式碼都是十分值得借鑑,同時能夠開箱即用,可以在日常編碼中直接引入,減少重複造輪子的勞動和風險。
個人部落格
- Throwable's Blog
(本文完 e-a-20200123 c-3-d