1. 程式人生 > >Netty(五)服務端啟動過程原始碼分析——好文摘抄

Netty(五)服務端啟動過程原始碼分析——好文摘抄

下面先來一段 Netty 服務端的程式碼:

public class NettyServer {

    public void bind(int port){
        // 建立EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();        //建立BOSS執行緒組 用於服務端接受客戶端的連線
        EventLoopGroup workerGroup = new NioEventLoopGroup();      //建立WORK執行緒組 用於進行SocketChannel的網路讀寫

        try {
            // 建立ServerBootStrap例項
            // ServerBootstrap 用於啟動NIO服務端的輔助啟動類,目的是降低服務端的開發複雜度
            ServerBootstrap b = new ServerBootstrap();
            // 繫結Reactor執行緒池
            b.group(bossGroup, workerGroup)
                    // 設定並繫結服務端Channel
                    // 指定所使用的NIO傳輸的Channel
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingServerHandler())
                    .childHandler(new ChannelInitializer(){

                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            //do something
                        }
                    });

            // 繫結埠,同步等待成功
            ChannelFuture future = b.bind(port).sync();
            // 等待服務端監聽埠關閉
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 優雅地關閉
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class LoggingServerHandler extends ChannelInboundHandlerAdapter{
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("loggin-channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("loggin-channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("loggin-handlerAdded");
        }
    }

    public static void main(String[] args){
            new NettyServer().bind(8899);
    }
}

上面程式碼為 Netty 伺服器端的完整程式碼,在整個服務端程式碼中會涉及如下幾個核心類。

ServerBootstrap

ServerBootstrap 為 Netty 服務端的啟動輔助類,它提供了一系列的方法用於設定服務端啟動相關的引數。

Channel

Channel 為 Netty 網路操作抽象類,它定義了一組功能,其提供的 API 大大降低了直接使用 Socket 類的複雜性。當然它也不僅僅只是包括了網路 IO 操作的基本功能,還包括一些與 Netty 框架相關的功能,包括獲取該 Channel 的 EventLoop 等等。

EventLoopGroup

EventLoopGroup 為 Netty 的 Reactor 執行緒池,它實際上就是 EventLoop 的容器,而 EventLoop 為 Netty 的核心抽象類,它的主要職責是處理所有註冊到本執行緒多路複用器 Selector 上的 Channel。

ChannelHandler

ChannelHandler 作為 Netty 的主要元件,它主要負責 I/O 事件或者 I/O 操作進行攔截和處理,它可以選擇性地攔截和處理自己感覺興趣的事件,也可以透傳和終止事件的傳遞。

ChannelPipeline

ChannelPipeline 是 ChannelHandler 鏈的容器,它負責 ChannelHandler 的管理和事件攔截與排程。每當新建一個 Channel 都會分配一個新的 ChannelPepeline,同時這種關聯是永久性的。

以上是簡要介紹,詳細介紹請參考(【死磕Netty】-----Netty的核心元件及其設計

)

服務端建立流程

Netty 服務端建立的時序圖,如下(摘自《Netty權威指南(第二版)》)

Netty 服務端建立的時序圖

主要步驟為:

  1. 建立 ServerBootstrap 例項
  2. 設定並繫結 Reactor 執行緒池
  3. 設定並繫結服務端 Channel
  4. 建立並初始化 ChannelPipeline
  5. 新增並設定 ChannelHandler
  6. 繫結並啟動監聽埠

服務端原始碼分析

1、建立兩個EventLoopGroup

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

bossGroup 為 BOSS 執行緒組,用於服務端接受客戶端的連線, workerGroup 為 worker 執行緒組,用於進行 SocketChannel 的網路讀寫。當然也可以建立一個並共享。

2、建立ServerBootstrap例項

ServerBootstrap b = new ServerBootstrap();

ServerBootStrap為Netty服務端的啟動引導類,用於幫助使用者快速配置、啟動服務端服務。提供的方法如下:

方法名稱 方法描述
group 設定 ServerBootstrap 要用的 EventLoopGroup
channel 設定將要被例項化的 ServerChannel 類
option 例項化的 ServerChannel 的配置項
childHandler 設定並新增 ChannelHandler
bind 繫結 ServerChannel

ServerBootStrap底層採用裝飾者模式。

關於 ServerBootStrap 我們後續做詳細分析。

3、設定並繫結Reactor執行緒池

呼叫 group() 方法,為 ServerBootstrap 例項設定並繫結 Reactor 執行緒池。

b.group(bossGroup, workerGroup)

EventLoopGroup 為 Netty 執行緒池,它實際上就是 EventLoop 的陣列容器。EventLoop 的職責是處理所有註冊到本執行緒多路複用器 Selector 上的 Channel,Selector 的輪詢操作由繫結的 EventLoop 執行緒 run 方法驅動,在一個迴圈體內迴圈執行。通俗點講就是一個死迴圈,不斷的檢測 I/O 事件、處理 I/O 事件。

這裡設定了兩個group,這個其實有點兒像我們工作一樣。需要兩型別的工人,一個老闆(bossGroup),一個工人(workerGroup),老闆負責從外面接活,工人則負責死命幹活(尼瑪,和我上家公司一模一樣)。所以這裡 bossGroup 的作用就是不斷地接收新的連線,接收之後就丟給 workerGroup 來處理,workerGroup 負責幹活就行(負責客戶端連線的 IO 操作)。

原始碼如下:

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);        // 繫結boosGroup
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;    // 繫結workerGroup
        return this;
    }

其中父 EventLoopGroup 傳遞到父類的建構函式中:

    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return (B) this;
    }

4、設定並繫結服務端Channel
繫結執行緒池後,則需要設定 channel 型別,服務端用的是 NioServerSocketChannel 。

.channel(NioServerSocketChannel.class)

呼叫 ServerBootstrap.channel 方法用於設定服務端使用的 Channel,傳遞一個 NioServerSocketChannel Class物件,Netty通過工廠類,利用反射建立NioServerSocketChannel 物件,如下:

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

channelFactory() 用於設定 Channel 工廠的:

    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }

    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return (B) this;
    }

這裡傳遞的是 ReflectiveChannelFactory,其原始碼如下:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
    //需要建立 channel 的時候,該方法將被呼叫
    @Override
    public T newChannel() {
        try {
            // 反射建立對應 channel
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

確定服務端的 Channel(NioServerSocketChannel)後,呼叫 option()方法設定 Channel 引數,作為服務端,主要是設定TCP的backlog引數,如下:

.option(ChannelOption.SO_BACKLOG, 1024)

option()原始碼如下:

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

五、新增並設定ChannelHandler

設定完 Channel 引數後,使用者可以為啟動輔助類和其父類分別指定 Handler。

 .handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
    //省略程式碼
})

這兩個 Handler 不一樣,前者(handler())設定的 Handler 是服務端 NioServerSocketChannel的,後者(childHandler())設定的 Handler 是屬於每一個新建的 NioSocketChannel 的。跟蹤原始碼會發現兩種所處的類不一樣,handler 位於 AbstractBootstrap 中,childHandler 位於 ServerBootstrap 中,如下:

    // AbstractBootstrap
    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return (B) this;
    }

    // ServerBootstrap
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }

ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的,所有連線該監聽埠的客戶端都會執行它,父類 AbstractBootstrap 中的 Handler 是一個工廠類,它為每一個新接入的客戶端都建立一個新的 Handler。如下圖(《Netty權威指南(第二版)》):

這裡寫圖片描述六、繫結埠,啟動服務

服務端最後一步,繫結埠並啟動服務,如下:

ChannelFuture future = b.bind(port).sync();

呼叫 ServerBootstrap 的 bind() 方法進行埠繫結:

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }    

首先呼叫 validate() 方法進行引數校驗,然後呼叫 doBind() 方法:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化並註冊一個Channel
        final ChannelFuture regFuture = initAndRegister();

        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        // 註冊成功
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            // 呼叫doBind0繫結
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

該方法涉及內容較多,我們分解來看,如下:

  1. 首先通過 initAndRegister() 得到一個 ChannelFuture 物件 regFuture;
  2. 根據得到的 regFuture 物件判斷該物件是否丟擲異常 (regFuture.cause()),如果是,直接返回;
  3. 根據 regFuture.isDone()判斷 initAndRegister()是否執行完畢,如果執行完成,則呼叫 doBind0
  4. 若 initAndRegister() 沒有執行完畢,則向 regFuture 物件新增一個 ChannelFutureListener 監聽,當 initAndRegister() 執行完畢後會呼叫 operationComplete(),在 operationComplete() 中依然會判斷 ChannelFuture 是否丟擲異常,如果沒有則呼叫 doBind0進行繫結。

按照上面的步驟我們一步一步來剖析 doBind() 方法。

initAndRegister()

執行 initAndRegister() 會得到一個 ChannelFuture 物件 regFuture,程式碼如下:

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 新建一個Channel
            channel = channelFactory.newChannel();
            // 初始化Channel
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
            }
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        // /向EventLoopGroup中註冊一個channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

首先呼叫 newChannel() 新建一個Channel,這裡是NioServerSocketChannel,還記前面 4、設定並繫結服務端Channel(.channel(NioServerSocketChannel.class)中 設定的Channel工廠類麼?在這裡派上用處了。在上面提到了通過反射的機制我們可以得到一個 NioServerSocketChannel 類的例項。那麼 NioServerSocketChannel 到底是一個什麼東西呢?如下圖:

這裡寫圖片描述

上圖是 NioServerSocketChannel 的繼承體系結構圖, NioServerSocketChannel 在建構函式中會依靠父類來完成一項一項的初始化工作。先看 NioServerSocketChannel 建構函式。

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

newSocket() 方法較為簡單,它是利用 SelectorProvider.openServerSocketChannel(),產生一個 ServerSocketChannel 物件。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

該建構函式首先是呼叫父類的構造方法,然後設定 config屬性。父類構造方法如下:

    // AbstractNioMessageChannel
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }

    // AbstractNioChannel
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

    // AbstractChannel
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

通過 super() ,一層一層往上,直到 AbstractChannel。我們從最上層解析。

  • AbstractChannel 設定了 unsafe (unsafe = newUnsafe())和 pipeline(pipeline = newChannelPipeline());
  • AbstractNioChannel 將當前 ServerSocketChannel 設定成了非阻塞(ch.configureBlocking(false);),同時設定SelectionKey.OP_ACCEPT事件(this.readInterestOp = readInterestOp; readInterestOp 值由 NioServerSocketChannel 中傳遞);
  • NioServerSocketChannel 設定 config屬性(config = new NioServerSocketChannelConfig(this, javaChannel().socket()))。

所以 channel = channelFactory.newChannel() 通過反射機制產生了 NioServerSocketChannel 類例項。同時該例項設定了NioMessageUnsafe、DefaultChannelPipeline、非阻塞、SelectionKey.OP_ACCEPT事件 和 NioServerSocketChannelConfig 屬性。

看完了 channelFactory.newChannel();,我們再看 init()

    void init(Channel channel) throws Exception {
         // 設定配置的option引數
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        // 獲取繫結的pipeline
        ChannelPipeline p = channel.pipeline();

        // 準備child用到的4個part
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        // 為NioServerSocketChannel的pipeline新增一個初始化Handler,
        // 當NioServerSocketChannel在EventLoop註冊成功時,該handler的init方法將被呼叫
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                //如果使用者配置過Handler
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 為NioServerSocketChannel的pipeline新增ServerBootstrapAcceptor處理器
                        // 該Handler主要用來將新建立的NioSocketChannel註冊到EventLoopGroup中
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

其實整個過程可以分為三個步驟:

  1. 設定 Channel 的 option 和 attr;
  2. 獲取繫結的 pipeline,然後為 NioServerSocketChanne l繫結的 pipeline 新增 Handler;
  3. 將用於服務端註冊的 Handler ServerBootstrapAcceptor 新增到 ChannelPipeline 中。ServerBootstrapAcceptor 為一個接入器,專門接受新請求,把新的請求扔給某個事件迴圈器。

至此初始化部分已經結束,我們再看註冊部分,

        // /向EventLoopGroup中註冊一個channel
        ChannelFuture regFuture = config().group().register(channel);

註冊方法的呼叫位於 initAndRegister() 方法中。注意這裡的 group() 返回的是前面的 boss NioEvenLoopGroup,它繼承 MultithreadEventLoopGroup,呼叫的 register(),也是 MultithreadEventLoopGroup 中的。如下:

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

呼叫 next() 方法從 EventLoopGroup 中獲取下一個 EventLoop,呼叫 register() 方法註冊:

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

將Channel和EventLoop封裝成一個DefaultChannelPromise物件,然後呼叫register()方法。DefaultChannelPromis為ChannelPromise的預設實現,而ChannelPromisee繼承Future,具備非同步執行結構,繫結Channel,所以又具備了監聽的能力,故而ChannelPromis是Netty非同步執行的核心介面。

    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

首先獲取 channel 的 unsafe 物件,該 unsafe 物件就是在之前設定過得。然後呼叫 register() 方法,如下:

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            // 必須要保證註冊是由該EventLoop發起的
            if (eventLoop.inEventLoop()) {
                register0(promise);        // 註冊
            } else {
                // 如果不是單獨封裝成一個task非同步執行
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

過程如下:

  1. 首先通過isRegistered() 判斷該 Channel 是否已經註冊到 EventLoop 中;
  2. 通過 eventLoop.inEventLoop() 來判斷當前執行緒是否為該 EventLoop 自身發起的,如果是,則呼叫 register0()直接註冊;
  3. 如果不是,說明該 EventLoop 中的執行緒此時沒有執行權,則需要新建一個執行緒,單獨封裝一個 Task,而該 Task 的主要任務則是執行 register0()

無論當前 EventLoop 的執行緒是否擁有執行權,最終都會要執行 register0(),如下:

        private void register0(ChannelPromise promise) {
            try {
                // 確保 Channel 處於 open
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;

                // 真正的註冊動作
                doRegister();

                neverRegistered = false;
                registered = true;        

                pipeline.invokeHandlerAddedIfNeeded();    
                safeSetSuccess(promise);        //設定註冊結果為成功

                pipeline.fireChannelRegistered();

                if (isActive()) { 
                    //如果是首次註冊,發起 pipeline 的 fireChannelActive
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

如果 Channel 處於 open 狀態,則呼叫 doRegister() 方法完成註冊,然後將註冊結果設定為成功。最後判斷如果是首次註冊且處於啟用狀態,則發起 pipeline 的 fireChannelActive()

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 註冊到NIOEventLoop的Selector上
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

這裡註冊時 ops 設定的是 0,也就是說 ServerSocketChannel 僅僅只是表示了註冊成功,還不能監聽任何網路操作,這樣做的目的是(摘自《Netty權威指南(第二版)》):

  1. 註冊方式是多型的,它既可以被 NIOServerSocketChannel 用來監聽客戶端的連線接入,也可以註冊 SocketChannel 用來監聽網路讀或者寫操作。
  2. 通過 SelectionKey.interestOps(int ops) 方法可以方便地修改監聽操作位。所以,此處註冊需要獲取 SelectionKey 並給 AbstractNIOChannel 的成員變數 selectionKey 賦值。

由於這裡 ops 設定為 0,所以還不能監聽讀寫事件。呼叫 doRegister()後,然後呼叫pipeline.invokeHandlerAddedIfNeeded();,這個時候控制檯會出現 loggin-handlerAdded,內部如何呼叫,我們在剖析 pipeline 時再做詳細分析。然後將註冊結果設定為成功(safeSetSuccess(promise))。呼叫 pipeline.fireChannelRegistered(); 這個時候控制檯會列印 loggin-channelRegistered。這裡簡單分析下該方法。

    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

pipeline 維護著 handle 連結串列,事件會在 NioServerSocketChannel 的 pipeline 中傳播。最終都會呼叫 next.invokeChannelRegistered(),如下:

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

在 invokeChannelRegistered() 會呼叫我們在前面設定的 handler (還記得簽名的 handler(new LoggingServerHandler() )麼)的 channelRegistered(),這個時候控制檯應該會列印 loggin-channelRegistered

到這裡initAndRegister() (final ChannelFuture regFuture = initAndRegister();)就分析完畢了,該方法主要做如下三件事:

  1. 通過反射產生了一個 NioServerSocketChannle 物件;
  2. 呼叫 init(channel)完成初始化工作;
  3. 將NioServerSocketChannel進行了註冊。

initAndRegister()篇幅較長,分析完畢了,我們再返回到doBind(final SocketAddress localAddress)。在 doBind(final SocketAddress localAddress) 中如果 initAndRegister()執行完成,則 regFuture.isDone() 則為 true,執行 doBind0()。如果沒有執行完成,則會註冊一個監聽 ChannelFutureListener,當 initAndRegister() 完成後,會呼叫該監聽的 operationComplete()方法,最終目的還是執行 doBind0()。故而我們下面分析 doBind0()到底做了些什麼。原始碼如下:

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

doBind0() 較為簡單,首先new 一個執行緒 task,然後將該任務提交到 NioEventLoop 中進行處理,我們先看 execute()

  public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

呼叫 inEventLoop() 判斷當前執行緒是否為該 NioEventLoop 所關聯的執行緒,如果是,則呼叫 addTask() 將任務 task 新增到佇列中,如果不是,則先啟動執行緒,在呼叫 addTask() 將任務 task 新增到佇列中。addTask() 如下:

    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

offerTask()新增到佇列中:

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }

task 新增到任務佇列 taskQueue成功後,執行任務會呼叫如下方法:

 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

channel 首先呼叫 bind() 完成 channel 與埠的繫結,如下:

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

tail 在 DefaultChannelPipeline 中定義:final AbstractChannelHandlerContext tail; 有 tail 就會有 head ,在 DefaultChannelPipeline 中維護這一個 AbstractChannelHandlerContext 節點的雙向連結串列,該連結串列是實現 Pipeline 機制的關鍵,更多詳情會在 ChannelPipeline 中做詳細說明。bind() 最終會呼叫 DefaultChannelPipeline 的 bind() 方法。如下:

    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (!validatePromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

首先對 localAddress 、 promise 進行校驗,符合規範則呼叫 findContextOutbound() ,該方法用於在 pipeline 中獲取 AbstractChannelHandlerContext 雙向連結串列中的一個節點,如下:

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

從該方法可以看出,所獲取的節點是從 tail 開始遍歷,獲取第一個節點屬性 outbound 為 true 的節點。其實該節點是 AbstractChannelHandlerContext 雙向連結串列的 head 節點。獲取該節點後,呼叫 invokeBind(),如下:

    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }

handler() 返回的是 HeadContext 物件,然後呼叫其bind(),如下:

        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }

unsafe 定義在 HeadContext 中,在建構函式中初始化(unsafe = pipeline.channel().unsafe();),呼叫 bind()如下:

        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {

                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 最核心方法
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

內部呼叫 doBind() ,該方法為繫結中最核心的方法,位於 NioServerSocketChannel 中,如下:

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

javaChannel()返回的是 NioServerSocketChannel 例項初始化時所產生的 Java NIO ServerSocketChannel 例項(ServerSocketChannelImple例項),然後呼叫其 bind(),如下:

    public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
        Object var3 = this.lock;
        synchronized(this.lock) {
            if(!this.isOpen()) {
                throw new ClosedChannelException();
            } else if(this.isBound()) {
                throw new AlreadyBoundException();
            } else {
                InetSocketAddress var4 = var1 == null?new InetSocketAddress(0):Net.checkAddress(var1);
                SecurityManager var5 = System.getSecurityManager();
                if(var5 != null) {
                    var5.checkListen(var4.getPort());
                }

                NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
                Net.bind(this.fd, var4.getAddress(), var4.getPort());
                Net.listen(this.fd, var2 < 1?50:var2);
                Object var6 = this.stateLock;
                synchronized(this.stateLock) {
                    this.localAddress = Net.localAddress(this.fd);
                }

                return this;
            }
        }
    }

該方法屬於 Java NIO 層次的,該方法涉及到服務端埠的繫結,埠的監聽,這些內容在後續的 Channel 時做詳細介紹。

到這裡就真正完成了服務端埠的繫結。