1. 程式人生 > 實用技巧 >netty 學習 NIO

netty 學習 NIO

從Java1.4開始, Java引入了non-blocking IO,簡稱NIO。NIO與傳統socket最大的不同就是引入了Channel和多路複用selector的概念。傳統的socket是基於stream的,它是單向的,有InputStream表示read和OutputStream表示寫。而Channel是雙工的,既支援讀也支援寫,channel的讀/寫都是面向Buffer。 NIO中引入的多路複用Selector機制(如果是linux系統,則應用的epoll事件通知機制)可使一個執行緒同時監聽多個Channel上發生的事件。 雖然Java NIO相比於以往確實是一個大的突破,但是如果要真正上手進行開發,且想要開發出好的一個服務端網路程式,那麼你得要花費一點功夫了,畢竟Java NIO只是提供了一大堆的API而已,對於一般的軟體開發人員來說只能呵呵了。因此,社群中就湧現了很多基於Java NIO的網路應用框架,其中以Apache的Mina,以及Netty最為出名,從本篇開始我們將深入的分析一下Netty的內部實現細節。

1.ServerBootstrap 和Bootstrap的關係

2.NioEventLoopGroup

Netty服務端示例:

EventLoopGroup bossGroup =newNioEventLoopGroup();//(1)
EventLoopGroup workerGroup =newNioEventLoopGroup();
try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup)  // (3) .channel(NioServerSocketChannel.class) // (4)
.handler(new LoggingHandler()) // (5) .childHandler(new ChannelInitializer<SocketChannel>() { // (6) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (7) .childOption(ChannelOption.SO_KEEPALIVE, true); // (8) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (9) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }

上面這段程式碼展示了服務端的一個基本步驟:

(1)、 初始化用於Acceptor的主"執行緒池"以及用於I/O工作的從"執行緒池";
(2)、 初始化ServerBootstrap例項, 此例項是netty服務端應用開發的入口,也是本篇介紹的重點, 下面我們會深入分析;
(3)、 通過ServerBootstrap的group方法,設定(1)中初始化的主從"執行緒池";
(4)、 指定通道channel的型別,由於是服務端,故而是NioServerSocketChannel;
(5)、 設定ServerSocketChannel的處理器(此處不詳述,後面的系列會進行深入分析)
(6)、 設定子通道也就是SocketChannel的處理器, 其內部是實際業務開發的"主戰場"(此處不詳述,後面的系列會進行深入分析)
(7)、 配置ServerSocketChannel的選項
(8)、 配置子通道也就是SocketChannel的選項
(9)、 繫結並偵聽某個埠

接著,我們再看看客戶端是如何開發的:

Netty客戶端示例:

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // (1)
        
        try {
            Bootstrap b = new Bootstrap(); // (2)
            b.group(workerGroup); // (3)
            b.channel(NioSocketChannel.class); // (4)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (5)
            b.handler(new ChannelInitializer<SocketChannel>() { // (6)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (7)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

客戶端的開發步驟和服務端都差不多:

(1)、 初始化用於連線及I/O工作的"執行緒池";
(2)、 初始化Bootstrap例項, 此例項是netty客戶端應用開發的入口,也是本篇介紹的重點, 下面我們會深入分析;
(3)、 通過Bootstrap的group方法,設定(1)中初始化的"執行緒池";
(4)、 指定通道channel的型別,由於是客戶端,故而是NioSocketChannel;
(5)、 設定SocketChannel的選項(此處不詳述,後面的系列會進行深入分析);
(6)、 設定SocketChannel的處理器, 其內部是實際業務開發的"主戰場"(此處不詳述,後面的系列會進行深入分析);
(7)、 連線指定的服務地址;

通過對上面服務端及客戶端程式碼分析,Bootstrap是Netty應用開發的入口,如果想要理解Netty內部的實現細節,那麼有必要先了解一下Bootstrap內部的實現機制。

首先我們先看一下ServerBootstrap及Bootstrap的類繼承結構圖:

通過類圖我們知道AbstractBootstrap類是ServerBootstrap及Bootstrap的基類,我們先看一下AbstractBootstrap類的主要程式碼:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    volatile EventLoopGroup group;
    private volatile ChannelFactory<? extends C> channelFactory;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;

    
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

    private B self() {
        return (B) this;
    }

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }

    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return self();
    }

    public <T> B attr(AttributeKey<T> key, T value) {
        if (key == null) {
            throw new NullPointerException("key");
        }
        if (value == null) {
            synchronized (attrs) {
                attrs.remove(key);
            }
        } else {
            synchronized (attrs) {
                attrs.put(key, value);
            }
        }
        return self();
    }

    public B validate() {
        if (group == null) {
            throw new IllegalStateException("group not set");
        }
        if (channelFactory == null) {
            throw new IllegalStateException("channel or channelFactory not set");
        }
        return self();
    }

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

    abstract void init(Channel channel) throws Exception;

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return self();
    }public abstract AbstractBootstrapConfig<B, C> config();

}

現在我們以示例程式碼為出發點,來詳細分析一下引導類內部實現細節:

1、 首先看看服務端的b.group(bossGroup, workerGroup):

呼叫ServerBootstrap的group方法,設定react模式的主執行緒池 以及 IO 操作執行緒池,ServerBootstrap中的group程式碼如下:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

在group方法中,會繼續呼叫父類的group方法,而通過類繼承圖我們知道,super.group(parentGroup)其實呼叫的就是AbstractBootstrap的group方法。AbstractBootstrap中group程式碼如下:

public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return self();
    }

通過以上分析,我們知道了AbstractBootstrap中定義了主執行緒池group的引用,而子執行緒池childGroup的引用是定義在ServerBootstrap中。

當我們檢視客戶端Bootstrap的group方法時,我們發現,其是直接呼叫的父類AbstractBoostrap的group方法。

2、示例程式碼中的 channel()方法

無論是服務端還是客戶端,channel呼叫的都是基類的channel方法,其實現細節如下:

public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }

我們發現,其實channel方法內部,只是初始化了一個用於生產指定channel型別的工廠例項。

3、option / handler / attr 方法

option:設定通道的選項引數, 對於服務端而言就是ServerSocketChannel, 客戶端而言就是SocketChannel;

 handler:設定主通道的處理器, 對於服務端而言就是ServerSocketChannel,也就是用來處理Acceptor的操作;

      對於客戶端的SocketChannel,主要是用來處理 業務操作;

attr:設定通道的屬性;

 option / handler / attr方法都定義在AbstractBootstrap中, 所以服務端和客戶端的引導類方法呼叫都是呼叫的父類的對應方法。

4、childHandler / childOption / childAttr 方法(只有服務端ServerBootstrap才有child型別的方法)

  對於服務端而言,有兩種通道需要處理, 一種是ServerSocketChannel:用於處理使用者連線的accept操作, 另一種是SocketChannel,表示對應客戶端連線。而對於客戶端,一般都只有一種channel,也就是SocketChannel。

  因此以child開頭的方法,都定義在ServerBootstrap中,表示處理或配置服務端接收到的對應客戶端連線的SocketChannel通道。

  childHandler / childOption / childAttr 在ServerBootstrap中的對應程式碼如下:

public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
        if (childOption == null) {
            throw new NullPointerException("childOption");
        }
        if (value == null) {
            synchronized (childOptions) {
                childOptions.remove(childOption);
            }
        } else {
            synchronized (childOptions) {
                childOptions.put(childOption, value);
            }
        }
        return this;
    }
public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
        if (childKey == null) {
            throw new NullPointerException("childKey");
        }
        if (value == null) {
            childAttrs.remove(childKey);
        } else {
            childAttrs.put(childKey, value);
        }
        return this;
    }

至此,引導類的屬性配置都設定完畢了。

本篇總結:

1、服務端由兩種執行緒池,用於Acceptor的React主執行緒和用於I/O操作的React從執行緒池; 客戶端只有用於連線及IO操作的React的主執行緒池;

2、ServerBootstrap中定義了服務端React的"從執行緒池"對應的相關配置,都是以child開頭的屬性。 而用於"主執行緒池"channel的屬性都定義在AbstractBootstrap中;