1. 程式人生 > >Netty NioEventLoop原始碼解讀

Netty NioEventLoop原始碼解讀

  NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務:I/O任務:即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法觸發。非IO任務:新增到taskQueue中的任務,如register0、bind0等任務,由runAllTasks方法觸發。兩種任務的執行時間比由變數ioRatio控制,預設為50,則表示允許非IO任務執行的時間與IO任務的執行時間相等。以下是NioEventLoop的構造方法。

//這是NioEventLoop的構造方法,NioEventLoopGroup 是管理NioEventLoop的集合,executor則為任務執行池,SelectorProvider 用於構造Selector物件,RejectedExecutionHandler用於在executor滿了執行的邏輯
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

在NioEventLoop類中,最重要的便是run方法,以下是run方法的原始碼:

    @Override
    protected void run() {
//這裡是個死迴圈,裡面的run方法邏輯會一直執行
        for (;;) {
            try {
//根據是佇列裡是否有任務執行不同的策略
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
//select內部具體的方法看下面的原始碼
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
//這裡是I/O與executor裡任務執行的比率,如果設為100則表示I/O的傳先級最高
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
//處理網路I/O事件processSelectedKeys可以看下面的原始碼
                        processSelectedKeys();
                    } finally {
                      //最終不是會執行任務佇列裡的任務的
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        //傳入一個執行了io的時間,根據io執行時間算出任務能夠執行多長時間
                        final long ioTime = System.nanoTime() - ioStartTime;
                      // runAllTasks看下面的原始碼
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

通過上面的run方法分析,內部有比較重要的三個方法, 我們下面來一一分析,其中比較重要的是select方法。裡面最重要的邏輯便是判斷執行selector的selectNow方法還是執行select方法。其中selectNow方法是非阻塞的,則select方法是阻塞的。同時這個方法也解決了JVM裡Selector空迴圈的bug,解決思想是發現執行了512次select還是沒有I/O事件,剛通過新建一個新的selector。具體程式碼如下(關鍵程式碼有相應的註釋):

//這裡處理的是select邏輯
    private void select(boolean oldWakenUp) throws IOException {
//這個Selector就 是 java.nio裡的selector啦
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
//selectDeadLineNanos得到的是最早任務的的一個執行時間
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
//如果最早需要執行的任務時間在0.5秒內,則執行selector的sellectNow方法,selectNow方法是非阻塞的
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
//再次確認有任務則執行selectNow方法
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
 //如果沒有佇列裡沒有任務需要執行,則通過select阻塞方法,得到selectedKeys
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
//selectedKeys不為空,則說明有相應的I/O事件,跳出迴圈,的run方法裡會處理具體的I/O事件
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                if (Thread.interrupted()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
//如果最早任務的執行時間還沒有相應的I/O事件,則把selectCnt置為1,重新開始內部的for迴圈
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//下面的邏輯用於處理java selector空輪詢的bug
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

另一個比較重要的方法是processSelectedKeys方法,裡面會處理具體的I/O事件,根據SelectionKey的readyops屬性處理不同的I/O事件,具體程式碼分析如下:

//根據selectedKeys 不同,採用不同的方法處理
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
//呼叫selector.selectedKeys()方法得到selectedKeys
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

//這裡處理的是沒有優化過的SelectedKeys
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        if (selectedKeys.isEmpty()) {
            return;
        }
//迴圈處理SelectionKey
        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
//具體的I/O事件在processSelectedKey方法裡
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
//根據 readyOps 判斷I/O事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//處理connect事件,
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              //處理write事件
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//處理read與accept事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

還有一個比較重要的方法便是runAllTasks,這個方法傳入能夠處理task的時間,在超過這個時間後會退出執行,以使執行緒能夠執行I/O邏輯,程式碼邏輯如下:

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
//從佇列裡取出執需要執行的task
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
//這個方法求出執行的最後時間
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
//執行任務
            safeExecute(task);

            runTasks ++;

//只有執行了64個任務才比較當前時間與deadline的大小,如果超過了則直接退出,這樣做的原因是取得系統的nanoTime也是個相對耗時的操作
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
//繼續從佇列裡拿出未執行的task
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

Netty是接收使用者連線後是如何處理的

  下面來看看netty接收client端的請求後都做了那些事情,我們從上面的processSelectedKey方法裡我們知道在處理SelectionKey.OP_ACCEPT事件的時候會呼叫
unsafe.read()方法,程式碼截圖如下:


5796101-865779fd501066e0.png image.png

下面我們通過unsafe來一步步追蹤netty是如何處理接收到的連線的,這裡的unsafe物件是NioMessageUnsafe 物件,程式碼如下:

    private final class NioMessageUnsafe extends AbstractNioUnsafe {
//這裡接收讀到的buf物件
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            assert eventLoop().inEventLoop();
// config()方法得到的是關於channel的配置資訊
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
//這裡會呼叫父類的unsafe方法
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
//doReadMessages方法呼叫的是channel裡的方法,在這裡會呼叫NioServerSocketChannel類裡的方法。通過readBuf會返回新建的SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
//這裡對增加到readBuf裡的物件進行處理通過pipeline物件,最終會呼叫到ServerBootstrapAcceptor類裡的channelRead方法
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

下面來看看NioServerSocketChannel類裡是如何處理doReadMessages的,主要的邏輯只是呼叫了java NIO的accept,然後返回SocketChannel,將返回的物件包裝成Netty內部使用的物件。具體程式碼如下:

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
//這裡會呼叫java NIO裡channel的accept方法,返回的也是java NIO裡的SocketChannel物件
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
//新建NioSocketChannel物件,傳的的parent就是當前NioServerSocketChannel, SocketChannel 就是剛剛呼叫accpt方法返回的物件。
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

ServerBootstrapAcceptor是一個ChannelInboundHandler類,裡面的核心方法如下圖所示:邏輯在圖裡都有相應的標註:


5796101-aa1538822727d8e6.png image.png