1. 程式人生 > >Netty原始碼閱讀

Netty原始碼閱讀

Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。本文講會對Netty服務啟動的過程進行分析,主要關注啟動的呼叫過程,從這裡面進一步理解Netty的執行緒模型,以及Reactor模式。

netty.jpg

這是我畫的一個Netty啟動過程中使用到的主要的類的概要類圖,當然是用到的類比這個多得多,而且我也忽略了各個類的繼承關係,關於各個類的細節,可能以後會寫單獨的部落格進行分析。在這裡主要注意那麼幾個地方:

1. ChannelPromise關聯了Channel和Executor,當然channel中也會有EventLoop的例項。
2. 每個channel有自己的pipeline例項。
3. 每個NioEventLoop中有自己的Executor例項和Selector例項。 

網路請求在NioEventLoop中進行處理,當然accept事件也是如此,它會把接收到的channel註冊到一個EventLoop的selector中,以後這個channel的所有請求都由所註冊的EventLoop進行處理,這也是Netty用來處理競態關係的機制,即一個channel的所有請求都在一個執行緒中進行處理,也就不會存在跨執行緒的衝突,因為這些呼叫都執行緒隔離了。

下面我們先看一段Netty原始碼裡面帶的example程式碼,直觀感受一下Netty的使用:

        // Configure the server.
        EventLoopGroup bossGroup
= new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG
, 100) // 設定tcp協議的請求等待佇列 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }

首先我們先來了解Netty的主要類:

EventLoop 這個相當於一個處理執行緒,是Netty接收請求和處理IO請求的執行緒。

EventLoopGroup 可以理解為將多個EventLoop進行分組管理的一個類,是EventLoop的一個組。

ServerBootstrap 從命名上看就可以知道,這是一個對服務端做配置和啟動的類。

ChannelPipeline 這是Netty處理請求的責任鏈,這是一個ChannelHandler的連結串列,而ChannelHandler就是用來處理網路請求的內容的。

ChannelHandler 用來處理網路請求內容,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannlPipeline會從頭到尾順序呼叫ChannelInboundHandler處理網路請求內容,從尾到頭呼叫ChannelOutboundHandler處理網路請求內容。這也是Netty用來靈活處理網路請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網路協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。

現在看上面的程式碼,首先建立了兩個EventLoopGroup物件,作為group設定到ServerBootstrap中,然後設定Handler和ChildHandler,最後呼叫bind()方法啟動服務。下面按照Bootstrap啟動順序來看程式碼。

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

首先是設定EverLoopGroup,parentGroup一般用來接收accpt請求,childGroup用來處理各個連線的請求。不過根據開發的不同需求也可以用同一個group同時作為parentGroup和childGroup同時處理accpt請求和其他io請求。

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

接下來的channel()方法設定了ServerBootstrap的ChannelFactory,這裡傳入的引數是NioServerSocketChannel.class,也就是說這個ReflectiveChannelFactory建立的就是NioServerSocketChannel的例項。

後面的option(),handler()和childHandler()分別是設定Socket連線的引數,設定parentGroup的Handler,設定childGroup的Handler。childHandler()傳入的ChannelInitializer實現了一個initChannel方法,用於初始化Channel的pipeline,以處理請求內容。

之前都是在對ServerBootstrap做設定,接下來的ServerBootstrap.bind()才是啟動的重頭戲。我們繼續按照呼叫順序往下看。

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

	  /**
     * Create a new {@link Channel} and bind it.
     */
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

    // AbstractBootstrap
	  private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

我們可以看到bind()的呼叫最終呼叫到了doBind(final SocketAddress),在這裡我們看到先呼叫了initAndRegister()方法進行初始化和register操作。瞭解JavaNIO框架的同學應該能看出來是在這個方法中將channel註冊到selector中的。最後程式再呼叫了doBind0()方法進行繫結,先按照順序看initAndRegister方法做了什麼操作。

    // AbstractBootstrap
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
          // ...
        }

        ChannelFuture regFuture = config().group().register(channel);
        // ...
        return regFuture;
    }

為了簡單其間,我忽略了處理異常分支的程式碼,同學們有興趣可以自行下載Netty原始碼對照。在這裡終於看到channel的建立了,呼叫的是ServerBootstrap的channelFactory,之前的程式碼我們也看到了這裡的工廠是一個ReflectChannelFactory,在建構函式中傳入的是NioServerSocketChannel.class,所以這裡建立的是一個NioServerSocketChannel的物件。接下來init(channel)對channel進行初始化。

    // ServerBootstrap
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }
        
        // 設定channel.attr
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        // childGroup的handler
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        // 給channelpipeline新增handler
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                // group的handler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
                // In this case the initChannel(...) method will only be called after this method returns. Because
                // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
                // placed in front of the ServerBootstrapAcceptor.
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

先是設定了channel的option和attr,然後將handler加入到channelpipleline的handler鏈中,這裡大家請特別注意ServerBootstrapAcceptor這個Handler,因為接下來對於客戶端請求的處理以及工作channl的註冊可全是這個Handler處理的。不過由於現在channel還沒有註冊,所以還不會呼叫initChannel()方法,而是將這個handler對應的context加入到一個任務佇列中,等到channel註冊成功了再執行。關於ChannelPipeline的內容我們以後再說。然後在initAndRegister()方法中呼叫config().group().register(channel)對channel進行註冊。config().group()獲取到的其實就是bossGroup,在這個例子中就是一個NioEventLoopGroup,由於它繼承了MultithreadEventLoopGroup所以這裡呼叫的其實是這個類的方法。

    // MultithreadEventLoopGroup
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    public EventLoop next() {
        return (EventLoop) super.next();
    }

    // SingleThreadEventLoop
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

這裡會獲取EventLoopGroup中的一個EventLoop,其實我們用的是NioEventLoopGroup所以這裡獲取到的其實是NioEventLoop,而NioEventLoop繼承了SingleThreadEventLoop,這裡register方法呼叫的就是SingleThreadEventLoop中的方法。我們重遇來到了channel最終註冊的地方,這裡其實是呼叫了channel的unsafe物件中的register方法,也就是NioServerSocketChannel的方法,這個方法是在AbstractChannel祖先類中實現的,程式碼如下:

         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(
            
           

相關推薦

Netty 原始碼閱讀之初始環境搭建

推薦 netty 系列原始碼解析合集 http://www.iocoder.cn/Netty/Netty-collection/?aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3R6c18xMDQxMjE4MTI5L2FydGljbGUvZGV0YWlscy83OD

netty原始碼閱讀之效能優化工具類之Recycle異執行緒獲取物件

在這篇《netty原始碼閱讀之效能優化工具類之Recycler獲取物件》文章裡面,我們還有一個scavenge()方法沒有解析,也就是在別的執行緒裡面回收物件。下面我們開始介紹,從這個方法開始進入: boolean scavenge() { // con

netty原始碼閱讀之解碼值基於固定長度解碼器分析

固定長度解碼器FixedLengthFrameDecoder比較簡單,我們看下它類的註釋: /** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes.

netty原始碼閱讀之解碼之基於長度域解碼器引數分析

這篇文章我們放鬆一點,只分析基於長度域解碼器的幾個引數, lengthFieldOffset :長度域的偏移量,也就是長度域要從什麼地方開始 lengthFieldLength:長度域的長度,也就是長度域佔多少個位元組 lengthAdjustment:長度域的值的調整

netty原始碼閱讀之解碼之基於長度域解碼器分析

基於長度域解碼器LengthFieldBasedFrameDecoder我們主要分析以下三點: 1、計算需要抽取的資料包的長度 2、跳過位元組邏輯處理 3、丟棄模式下的處理 首先原始碼還是LengthFieldBasedFrameDecoder的decode方法:

netty原始碼閱讀之編碼之MessageToByteEncoder

MessageToByteEncoder的write過程,我們分析以下幾步: 1、匹配物件 2、分配記憶體 3、編碼實現 4、釋放物件 5、傳播資料 6、釋放記憶體 原始碼在這裡: @Override public void write(Cha

netty原始碼閱讀之效能優化工具類之FastThreadLocal的使用

先說明FastThreadLocal使用的效果。 1、比jdk原生的ThreadLocal的快 2、不同執行緒之間能保證執行緒安全 這是我們的使用者程式碼: public class FastThreadLocalTest { private static F

netty原始碼閱讀之效能優化工具類之FastThreadLocal的建立

建立的話我們直接從FastThreadLocal的構造方法進入: public FastThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); } 可見他是現

netty原始碼閱讀之效能優化工具類之Recycler獲取物件

 Recycler獲取物件主要分為以下幾部分: 1、獲取當前執行緒的Stack 2、從Stack裡面彈出物件 3、如果彈出物件為空,那就建立物件並且繫結到Stack裡面 我們從Recycler的get方法進入,就是這個原始碼: @SuppressWarnin

netty原始碼閱讀與分析----HashedWheelTimer

netty是一個基礎通訊框架,管理的連線數較多,可能多至百萬級,每一個連線都有或多或少有超時任務,比如傳送資料超時,心跳檢測等。如果為每一個連線都啟動一個Timer,不僅效率低下,而且佔用資源。基於論文Hashed and hierarchical timing wheels

Netty 原始碼閱讀 —— 服務端建立

之前專案中用過netty,這次趁著面試空閒時間,重新梳理一遍netty原始碼,從服務端建立開始,話不多說,直接上程式碼先看看netty服務端建立的整體程式碼,大概如下所示:public void bind(int port) throws Exception { Ev

netty原始碼閱讀之NioEventLoop之NioEventLoop執行-----runAllTask

processSelectedKey()和runAllTask() final int ioRatio = this.ioRatio; if (ioRatio == 100) {

Netty原始碼閱讀

Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。本文講會對Netty服務啟動的過程進行分析,主要關注啟動的呼叫過程,從這裡面進一步理解Netty的執行緒模型

Netty原始碼閱讀之如何將TCP的讀寫操作和指定執行緒繫結

**原文連結**:[http://xueliang.org/article/detail/20200712234015993](http://xueliang.org/article/detail/20200712234015993) # 前言 在Netty的執行緒模型中,對於一個TCP連線的讀寫操作,都是

01 storm 原始碼閱讀 storm的程序間訊息通訊實現netty server實現

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * dist

02 storm 原始碼閱讀 storm的程序間訊息通訊實現netty client實現

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * dist

Mac下一款不錯的原始碼閱讀軟體

1、支援多語言:Ada, C, C++, C#, Java, FORTRAN, Delphi, Jovial, and PL/M ,混合語言的project也支援 2、多平臺: Windows/Linux/Solaris/HP-UX/IRIX/MAC OS X 3、程式碼語法高亮、程式碼折迭

Memcache-Java-Client-Release原始碼閱讀(之七)

一、主要內容 本章節的主要內容是介紹Memcache Client的Native,Old_Compat,New_Compat三個Hash演算法的應用及實現。 二、準備工作 1、伺服器啟動192.168.0.106:11211,192.168.0.106:11212兩個服務端例項。

Memcache-Java-Client-Release原始碼閱讀(之六)

一、主要內容 本章節的主要內容是介紹Memcache Client的一致性Hash演算法的應用及實現。 二、準備工作 1、伺服器啟動192.168.0.106:11211,192.168.0.106:11212兩個服務端例項。 2、示例程式碼: String[] serve

Netty 原始碼分析之拆包器的奧祕

為什麼要粘包拆包 為什麼要粘包 首先你得了解一下TCP/IP協議,在使用者資料量非常小的情況下,極端情況下,一個位元組,該TCP資料包的有效載荷非常低,傳遞100位元組的資料,需要100次TCP傳送,100次ACK,在應用及時性要求不高的情況下,將這100個有效資料拼接成一個數據包,那會縮短到一個TCP資