1. 程式人生 > >Netty NioEventLoop 啟動過程原始碼分析

Netty NioEventLoop 啟動過程原始碼分析

原文連結:https://wangwei.one/posts/netty-nioeventloop-analyse-for-startup.html

前面 ,我們分析了NioEventLoop的建立過程,接下來我們開始分析NioEventLoop的啟動和執行邏輯。

Netty版本:4.1.30

啟動

在之前分析 Channel繫結 的文章中,提到過下面這段程式碼,先前只講了 channel.bind() 繫結邏輯,跳過了execute() 介面,現在我們以這個為例,開始分析NioEventLoop的execute()介面,主要邏輯如下:

  • 新增任務佇列
  • 綁定當前執行緒到EventLoop上
  • 呼叫EventLoop的run()方法
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // 通過eventLoop來執行channel繫結的Task
    channel.eventLoop().execute(new Runnable() {
        @Override
public void run() { if (regFuture.isSuccess()) { // channel繫結 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } }
}); }

往下追蹤到 SingleThreadEventExecutor 中 execute 介面,如下:

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 判斷當前執行時執行緒是否與EventLoop中繫結的執行緒一致
    // 這裡還未繫結Thread,所以先返回false
    boolean inEventLoop = inEventLoop();
    // 將任務新增任務佇列,也就是我們前面講EventLoop建立時候提到的 MpscQueue.
    addTask(task);
    if (!inEventLoop) {
        // 啟動執行緒
        startThread();
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

啟動執行緒介面:

private void startThread() {
    // 狀態比較,最開始時state = 1 ,為true
    if (state == ST_NOT_STARTED) {
        // cs操作後,state狀態設定為 2
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                // 啟動介面
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

// 執行執行緒啟動方法
private void doStartThread() {
    // 斷言判斷 SingleThreadEventExecutor 還未繫結 Thread
    assert thread == null;
    // executor 執行任務
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 將 SingleThreadEventExecutor(在我們的案例中就是NioEventLoop) 與 當前執行緒進行繫結
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }
            // 設定狀態為 false
            boolean success = false;
            // 更新最近一次任務的執行時間
            updateLastExecutionTime();
            try {
                // 往下呼叫 NioEventLoop 的 run 方法,執行
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
               
               ...
               
            }
        }
    });
}

執行

往下呼叫到 NioEventLoop 中的 run 方法,通過無限for迴圈,主要做以下三件事情:

  • 輪循I/O事件:select(wakenUp.getAndSet(false))
  • 處理I/O事件:processSelectedKeys
  • 執行Task任務:runAllTasks
@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    // 輪訓檢測I/O事件
                    // wakenUp為了標記selector是否是喚醒狀態,每次select操作,都設定為false,也就是未喚醒狀態。
                    select(wakenUp.getAndSet(false));
                    // 'wakenUp.compareAndSet(false, true)' 總是在呼叫 'selector.wakeup()' 之前進行評估,以減少喚醒的開銷
                    // (Selector.wakeup() 是非常耗效能的操作.)
                    
                    // 但是,這種方法存在競爭條件。當「wakeup」太早設定為true時觸發競爭條件
                    
                    // 在下面兩種情況下,「wakenUp」會過早設定為true:
                    // 1)Selector 在 'wakenUp.set(false)' 與 'selector.select(...)' 之間被喚醒。(BAD)
                    // 2)Selector 在 'selector.select(...)' 與 'if (wakenUp.get()) { ... }' 之間被喚醒。(OK)
                    
                    // 在第一種情況下,'wakenUp'設定為true,後面的'selector.select(...)'將立即喚醒。 直到'wakenUp'在下一輪中再次設定為false,'wakenUp.compareAndSet(false,true)'將失敗,因此任何喚醒選擇器的嘗試也將失敗,從而導致以下'selector.select(。 ..)'呼籲阻止不必要的。
                    
                    // 要解決這個問題,如果在selector.select(...)操作之後wakenUp立即為true,我們會再次喚醒selector。 它是低效率的,因為它喚醒了第一種情況(BAD - 需要喚醒)和第二種情況(OK - 不需要喚醒)的選擇器。
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            // ioRatio 表示處理I/O事件與執行具體任務事件之間所耗時間的比值。
            // ioRatio 預設為50
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 處理I/O事件
                    processSelectedKeys();
                } finally {
                    // 處理任務佇列
                    runAllTasks();
                }
            } else {
                // 處理IO事件的開始時間
                final long ioStartTime = System.nanoTime();
                try {
                    // 處理I/O事件
                    processSelectedKeys();
                } finally {
                    // 記錄io所耗時間
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 處理任務佇列,設定最大的超時時間
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

輪循檢測I/O事件

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        // select操作計數
        int selectCnt = 0;
        // 記錄當前系統時間
        long currentTimeNanos = System.nanoTime();
        // delayNanos方法用於計算定時任務佇列,最近一個任務的截止時間
        // selectDeadLineNanos 表示當前select操作所不能超過的最大截止時間
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            // 計算超時時間,判斷是否超時
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            // 如果 timeoutMillis <= 0, 表示超時,進行一個非阻塞的 select 操作。設定 selectCnt 為 1. 並終止本次迴圈。
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 當wakenUp為ture時,恰好有task被提交,這個task將無法獲得呼叫的機會
            // Selector#wakeup. 因此,在執行select操作之前,需要再次檢查任務佇列
            // 如果不這麼做,這個Task將一直掛起,直到select操作超時
            // 如果 pipeline 中存在 IdleStateHandler ,那麼Task將一直掛起直到 空閒超時。
            
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                // 呼叫非阻塞方法
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            // 如果當前任務佇列為空,並且超時時間未到,則進行一個阻塞式的selector操作。timeoutMillis 為最大的select時間
            int selectedKeys = selector.select(timeoutMillis);
            // 操作計數 +1
            selectCnt ++;
			
            // 存在以下情況,本次selector則終止
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - 輪訓到了事件(Selected something,)
                // - 被使用者喚醒(waken up by user,)
                // - 已有任務佇列(the task queue has a pending task.)
                // - 已有定時任務(a scheduled task is ready for processing)
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            // 記錄當前時間
            long time = System.nanoTime();
            // 如果time > currentTimeNanos + timeoutMillis(超時時間),則表明已經執行過一次select操作
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } 
            // 如果 time <= currentTimeNanos + timeoutMillis,表示觸發了空輪訓
            // 如果空輪訓的次數超過 SELECTOR_AUTO_REBUILD_THRESHOLD (512),則重建一個新的selctor,避免空輪訓
            else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                        selectCnt, selector);

                // 重建建立一個新的selector
                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                // 對重建後的selector進行一次非阻塞呼叫,用於獲取最新的selectedKeys
                selector.selectNow();
                // 設定select計數
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
        // Harmless exception - log anyway
    }
}

重新建立一個新的Selector

該方法的主要邏輯就是:

  • 建立一個新的selector
  • 將老的selector上的 selectKey註冊到新的 selector 上
public void rebuildSelector() {
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    rebuildSelector0();
}

// 重新建立selector
private void rebuildSelector0() {
    // 暫存老的selector
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        return;
    }

    try {
        // 建立一個新的 SelectorTuple
        // openSelector()在之前分析過了
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    // 記錄select上註冊的channel數量
    int nChannels = 0;
    // 遍歷老的 selector 上的 SelectionKey 
    for (SelectionKey key: oldSelector.keys()) {
        // 獲取 attachment,這裡的attachment就是我們前面在講 Netty Channel註冊時,select會將channel賦值到 attachment 變數上。
        // 獲取老的selector上註冊的channel 
        Object a = key.attachment();
        try {
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }
			// 獲取興趣集
            int interestOps = key.interestOps();
            // 取消 SelectionKey
            key.cancel();
            // 將老的興趣集重新註冊到前面新建立的selector上
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            // nChannels計數 + 1
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }
	
    // 設定新的 selector
    selector = newSelectorTuple.selector;
    // 設定新的 unwrappedSelector
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        // time to close the old selector as everything else is registered to the new one
        // 關閉老的seleclor
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }

    if (logger.isInfoEnabled()) {
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
}

處理I/O事件


private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        // 設定為null,有利於GC回收
        selectedKeys.keys[i] = null;
		// 獲取 SelectionKey 中的 attachment, 我們這裡就是 NioChannel
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 處理 SelectedKey
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

// 處理 SelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 獲取Netty Channel中的 NioUnsafe 物件,用於後面的IO操作
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 判斷 SelectedKey 的有效性,如果無效,則直接返回並關閉channel
    if (!k.isValid(