1. 程式人生 > 其它 >Netty原始碼剖析之四:服務端處理TCP連線

Netty原始碼剖析之四:服務端處理TCP連線

技術標籤:Netty

1. 通道註冊SelectionKey.OP_ACCEPT事件

我們知道服務端啟動過程中在ServerBootstrap.initAndRegister註冊通道時給SelectionKey繫結物件NioServerSocketChannel,以及在繫結埠和地址時給Channel通道註冊了SelectionKey.OP_ACCEPT事件。檢視AbstractNioChannel.doRegister()可知selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

2. 處理SelectionKey.OP_ACCEPT
事件

檢視主執行緒池的子執行緒NioEventLoop.run()方法中處理IO事件邏輯。

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        // 這裡是註冊時繫結的NioServerSocketChannel物件。
        final Object a =
k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey
(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // 這裡是NioMessageUnsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop != this || eventLoop == null) { return; } unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } // 處理SelectionKey.OP_ACCEPT事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
3. NioMessageUnsafe.read()方法

大概流程:

  1. 接收TCP連線,接收新連線的通道,並例項化NioSocketChannel物件,設定新連線通道非阻塞等。
  2. 註冊新連線的通道,往NioSocketChannel的管道中新增childHandler,註冊新連線的Channel到當前選擇的NioEventLoop執行緒的Selector上,繫結感興趣的事件SelectionKey.OP_READ
@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        if (exception != null) {
            closed = closeOnReadError(exception);
            pipeline.fireExceptionCaught(exception);
        }
        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
3.1 接收TCP連線,例項化NioSocketChannel物件

接收TCP連線,並例項化NioSocketChannel物件。

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
    return 0;
}

最終呼叫父類AbstractNioByteChannelAbstractNioChannel建構函式,設定通道非阻塞,初始化變數值readInterestOp = SelectionKey.OP_READ

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                        "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
3.2 註冊新連線的通道

此時主執行緒的pipeline連結串列為head -> ServerBootstrapAcceptor -> tail,通知pipeline執行fireChannelRead會執行ServerBootstrapAcceptor.channelRead()方法。註冊新連線的通道大概流程:

  1. 新增業務邏輯handler,即NioSocketChannel物件的pipeline中新增handler。
  2. 呼叫工作執行緒池執行註冊方法,如果是Reactor單執行緒模型,則複用主執行緒NioEventLoop
  3. 繫結NioSocketChannel所在的執行緒為輪詢獲取到的執行緒,即NioEventLoop,Netty的無鎖序列化體現在這裡。
  4. 如果執行緒未啟動則會啟動執行緒註冊通道,註冊新連線的Channel到當前選擇的NioEventLoop執行緒的Selector上。
  5. 通知pipeline.invokeHandlerAddedIfNeeded(),執行childHandlerinitChannel方法,新增自定義的handler。
  6. 重新繫結Selector感興趣的事件為SelectionKey.OP_READ
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

最終呼叫AbstractUnsafe.register()方法

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        if (isActive()) {
            if (firstRegistration) {
            	// 這裡的Head會執行read方法,最終執行unsafe.read()方法
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
// 這裡的Selector為當前執行緒的Selector,即eventLoop().unwrappedSelector()
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }