1. 程式人生 > 程式設計 >Netty: DefaultPromise原始碼解讀

Netty: DefaultPromise原始碼解讀

一、為什麼需要 io.netty.util.concurrent.Promise ?

如果你有一個阻塞的方法,比如 Thread.sleep(1000),而又不想阻塞當前執行緒 A,只需要把該方法包裝成一個任務由另一個執行緒 B 執行即可。

ExecutorService pool = Executors.newFixedThreadPool(3);
Future<Integer> future = pool.submit(() -> {
    Thread.sleep(1000);
    return 1;
});
複製程式碼

如果你需要在任務結束之後執行其他邏輯,一種方式是 A 執行緒先通過呼叫 future.get()

獲取值,然後執行其他程式碼;但是 get 方法本身也是一個阻塞方法,在這期間 A 執行緒阻塞。

另外一種方法是 B 執行緒執行完任務後,繼續執行後續邏輯。Netty 中的 Future,io.netty.util.concurrent.Future,通過回撥方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); 實現了該功能。

Promise 介面繼承了 Future 介面,在增加了 listener 的情況下,提供了 Promise<V> setSuccess(V result)

方法,可以在任務中手動設定返回值,並立即通知 listeners。

二、例項程式

private static NioEventLoopGroup loopGroup = new NioEventLoopGroup(8);

public void methodA() {
    Promise promise = methodA("ceee...eeeb");
    promise.addListener(future -> {		// 1
        Object ret = future.get();      // 4. 此時可以直接拿到結果
      	// 後續邏輯由 B 執行緒執行
System.out.println(ret); }); // A 執行緒不阻塞,繼續執行其他程式碼... } public Promise<ResponsePacket> methodB(String name) { Promise<ResponsePacket> promise = new DefaultPromise<>(loopGroup.next()); loopGroup.schedule(() -> { // 2 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("scheduler thread: " + Thread.currentThread().getName()); promise.setSuccess("hello " + name); // 3 },0,TimeUnit.SECONDS); return promise; } 複製程式碼

簡單的使用 Promise 包括:

  1. 給 promise 增加 listener,promise.addListener()
  2. 分配執行任務的執行緒,loopGroup.schedule()
  3. 在任務執行過程中,設定結果,promise.setSuccess()

三、原始碼分析

1. addListener

// class: DefaultPromise
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener,"listener");

    synchronized (this) {  // 1. 增加 listener
        addListener0(listener);	
    }

    if (isDone()) {  // 2. 如果任務執行完了,通知所有 listener
        notifyListeners();
    }

    return this;
}
複製程式碼

繼續看 addListener0:

private Object listeners;
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
 	// 1. 新增第 1 個 listener 時,直接賦值即可
    if (listeners == null) {
        listeners = listener;
    } 
  	// 3. 新增第 3 個以及更多 listener 時,直接加入陣列即可
  	else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } 
  	// 2. 新增第 2 個 listener 時,listeners 型別更改為 DefaultFutureListeners,內部實現為一個陣列
  	else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners,listener);
    }
}
複製程式碼

由於可以新增多個 listener,很容易想到通過一個陣列儲存所有 listener。而實現類裡面 listeners 型別為 Object,可能是考慮到大部分都只有 1 個 listener,節省記憶體空間。

2. schedule

將任務加入佇列,由執行緒池執行。

3. setSuccess

// class: DefaultPromise
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {	// 如果設定成功,返回;否則拋異常
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
  	// 設定 result
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    // cas 操作
    if (RESULT_UPDATER.compareAndSet(this,null,objResult) ||
        RESULT_UPDATER.compareAndSet(this,UNCANCELLABLE,objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}

private synchronized boolean checkNotifyWaiters() {
      /**
       * 有些執行緒不是通過增加 listener 的方式獲取結果,而是通過 promise.get() 方法獲取,
       * 那麼這些執行緒為阻塞狀態;當設定了 result 後,需要喚醒這些執行緒
       */
    if (waiters > 0) {
        notifyAll();
    }
    return listeners != null;  // 只要存在 listener,就返回 true
}
複製程式碼

繼續檢視 notifyListeners:

// class: DefaultPromise
private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
      	// TODO 巢狀監聽
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
              	// 1. 如果是 promise 繫結的執行緒,直接執行
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }

    // 2. 否則,加入任務排程,因此 listener 方法最終還是由 promise 繫結的執行緒執行的
    safeExecute(executor,new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
      	// 依次通知所有 listener
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this,(GenericFutureListener<?>) listeners);
        }
        synchronized (this) {
            if (this.listeners == null) {
                notifyingListeners = false;
                return;
            }
            // 通知原先的 listeners 時,有可能有新的 listener 在此期間註冊,也需要通知到
            listeners = this.listeners;
            this.listeners = null;
        }
    }
}

private static void notifyListener0(Future future,GenericFutureListener l) {
    try {
        l.operationComplete(future);	// 執行 listener 中的方法
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()",t);
        }
    }
}
複製程式碼