Netty: DefaultPromise原始碼解讀
阿新 • • 發佈:2019-12-31
一、為什麼需要 io.netty.util.concurrent.Promise ?
如果你有一個阻塞的方法,比如 Thread.sleep(1000),而又不想阻塞當前執行緒 A,只需要把該方法包裝成一個任務由另一個執行緒 B 執行即可。
ExecutorService pool = Executors.newFixedThreadPool(3);
Future<Integer> future = pool.submit(() -> {
Thread.sleep(1000);
return 1;
});
複製程式碼
如果你需要在任務結束之後執行其他邏輯,一種方式是 A 執行緒先通過呼叫 future.get()
另外一種方法是 B 執行緒執行完任務後,繼續執行後續邏輯。Netty 中的 Future,io.netty.util.concurrent.Future,通過回撥方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
實現了該功能。
Promise 介面繼承了 Future 介面,在增加了 listener 的情況下,提供了 Promise<V> setSuccess(V result)
二、例項程式
private static NioEventLoopGroup loopGroup = new NioEventLoopGroup(8);
public void methodA() {
Promise promise = methodA("ceee...eeeb");
promise.addListener(future -> { // 1
Object ret = future.get(); // 4. 此時可以直接拿到結果
// 後續邏輯由 B 執行緒執行
System.out.println(ret);
});
// A 執行緒不阻塞,繼續執行其他程式碼...
}
public Promise<ResponsePacket> methodB(String name) {
Promise<ResponsePacket> promise = new DefaultPromise<>(loopGroup.next());
loopGroup.schedule(() -> { // 2
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("scheduler thread: " + Thread.currentThread().getName());
promise.setSuccess("hello " + name); // 3
},0,TimeUnit.SECONDS);
return promise;
}
複製程式碼
簡單的使用 Promise 包括:
- 給 promise 增加 listener,
promise.addListener()
; - 分配執行任務的執行緒,
loopGroup.schedule()
; - 在任務執行過程中,設定結果,
promise.setSuccess()
;
三、原始碼分析
1. addListener
// class: DefaultPromise
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener,"listener");
synchronized (this) { // 1. 增加 listener
addListener0(listener);
}
if (isDone()) { // 2. 如果任務執行完了,通知所有 listener
notifyListeners();
}
return this;
}
複製程式碼
繼續看 addListener0:
private Object listeners;
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 1. 新增第 1 個 listener 時,直接賦值即可
if (listeners == null) {
listeners = listener;
}
// 3. 新增第 3 個以及更多 listener 時,直接加入陣列即可
else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
}
// 2. 新增第 2 個 listener 時,listeners 型別更改為 DefaultFutureListeners,內部實現為一個陣列
else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners,listener);
}
}
複製程式碼
由於可以新增多個 listener,很容易想到通過一個陣列儲存所有 listener。而實現類裡面 listeners 型別為 Object,可能是考慮到大部分都只有 1 個 listener,節省記憶體空間。
2. schedule
將任務加入佇列,由執行緒池執行。
3. setSuccess
// class: DefaultPromise
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) { // 如果設定成功,返回;否則拋異常
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
// 設定 result
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
// cas 操作
if (RESULT_UPDATER.compareAndSet(this,null,objResult) ||
RESULT_UPDATER.compareAndSet(this,UNCANCELLABLE,objResult)) {
if (checkNotifyWaiters()) {
notifyListeners();
}
return true;
}
return false;
}
private synchronized boolean checkNotifyWaiters() {
/**
* 有些執行緒不是通過增加 listener 的方式獲取結果,而是通過 promise.get() 方法獲取,
* 那麼這些執行緒為阻塞狀態;當設定了 result 後,需要喚醒這些執行緒
*/
if (waiters > 0) {
notifyAll();
}
return listeners != null; // 只要存在 listener,就返回 true
}
複製程式碼
繼續檢視 notifyListeners:
// class: DefaultPromise
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
// TODO 巢狀監聽
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 1. 如果是 promise 繫結的執行緒,直接執行
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 2. 否則,加入任務排程,因此 listener 方法最終還是由 promise 繫結的執行緒執行的
safeExecute(executor,new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
// 依次通知所有 listener
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this,(GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
notifyingListeners = false;
return;
}
// 通知原先的 listeners 時,有可能有新的 listener 在此期間註冊,也需要通知到
listeners = this.listeners;
this.listeners = null;
}
}
}
private static void notifyListener0(Future future,GenericFutureListener l) {
try {
l.operationComplete(future); // 執行 listener 中的方法
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()",t);
}
}
}
複製程式碼