1. 程式人生 > >我們在接收一個新的連線請求之後是如何生成一個新的socketchannel並註冊讀寫事件的呢

我們在接收一個新的連線請求之後是如何生成一個新的socketchannel並註冊讀寫事件的呢

我們知道,在NioEventLoop當中,我們會迴圈處理得到的selectedKeys,呼叫的方法

 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

其中有程式碼會去處理Accept事件

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }

呼叫了Channel的unsafe屬性的read方法,裡面會有程式碼產生並獲取SocketChannel

 int localRead = doReadMessages(readBuf);
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));//pipeline就會將這個訊息交給他的channelHandler處理,
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

我們知道,在初始化Channel的時候,會在這個Channel的pipeline中新增一些handler,並在註冊完成之後完成新增操作

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        })

對於ChannelInitializer,他也是一個channelHandler,但是和其他的Handler不同的是,他在註冊完成之後,完成了響應的channelAdd回撥的過程中,會執行它的initChannel方法,並將自身從pipeline中移除掉。

所以在initChannel方法中,我們又看到了  ch.eventLoop().execute,這個會向eventLoop的提交一個任務,這個任務會執行一個新增handler的任務,所以unsafe.read()方法中的

pipeline.fireChannelRead(readBuf.get(i));

會進入到ServerBootstrapAcceptor的channelRead方法中,在這裡完成對socketChannel的初始化和註冊工作。

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
 @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());//看見沒有,這個就是生產一個socketChannel的方法,基本的方法就在這個裡面

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }