1. 程式人生 > 實用技巧 >Netty 工作原理解析

Netty 工作原理解析

執行緒模型介紹

不同的執行緒模式,對程式的效能有很大影響,為了搞清 Netty 執行緒模式,下面來系統的講解下各個執行緒模式, 最後看看 Netty 執行緒模型有什麼優越性。

目前存在的執行緒模型有:

  • 傳統阻塞 I/O 服務模型
  • Reactor 模式

根據 Reactor 的數量和處理資源池執行緒的數量不同,有 3 種典型的實現:

  • 單 Reactor 單執行緒
  • 單 Reactor 多執行緒
  • 主從 Reactor 多執行緒

Netty 執行緒模式:Netty 主要基於主從 Reactor 多執行緒模型做了一定的改進,其中主從 Reactor 多執行緒模型有多個 Reactor。

傳統阻塞 I/O 服務模型

工作原理示意圖

模型特點

  • 採用阻塞 IO 模式獲取輸入的資料
  • 每個連線都需要獨立的執行緒完成資料的輸入,業務處理, 資料返回

問題分析

  • 當併發數很大,就會建立大量的執行緒,佔用很大的系統資源
  • 連線建立後,如果當前執行緒暫時沒有資料可讀,該執行緒會阻塞在 read 操作,造成執行緒資源浪費

Reactor 模式

Reactor: 反應器模式,也被稱為分發者模式(Dispatcher)或通知者模式(notifier)。

針對傳統阻塞 I/O 服務模型的 2 個缺點,解決方案如下:

  • 基於 I/O 複用模型:多個連線共用一個阻塞物件,應用程式只需要在一個阻塞物件等待,無需阻塞等待所有連線。當某個連線有新的資料可以處理時,作業系統通知應用程式,執行緒從阻塞狀態返回,開始進行業務處理。
  • 基於執行緒池複用執行緒資源:不必再為每個連線建立執行緒,將連線完成後的業務處理任務分配給執行緒進行處理,一個執行緒可以處理多個連線的業務。

Reactor 模式設計思想

Reactor 模式基本設計思想是I/O 複用結合線程池,如下圖所示:

  1. Reactor 模式,通過一個或多個輸入同時傳遞給服務處理器(基於事件驅動) 。
  2. 伺服器端程式處理傳入的多個請求,並將它們同步分派到相應的處理執行緒,因此 Reactor 模式也叫 Dispatcher 模式。
  3. Reactor 模式使用 IO 複用監聽事件,收到事件後,分發給某個執行緒(程序), 這點就是網路伺服器高併發處理關鍵。

Reactor 模式核心組成

  1. Reactor:Reactor 在一個單獨的執行緒中執行,負責監聽和分發事件,分發給適當的處理程式來對 IO 事件做出反應。
  2. Handlers:處理程式執行 I/O 事件要完成的實際事件。Reactor 通過排程適當的處理程式來響應 I/O 事件,處理程式執行非阻塞操作。

Reactor 模式分類

根據 Reactor 的數量和處理資源池執行緒的數量不同,有 3 種典型的實現:

  1. 單 Reactor 單執行緒
  2. 單 Reactor 多執行緒
  3. 主從 Reactor 多執行緒

單Reactor 單執行緒模式

工作原理示意圖

工作流程說明

  1. Select 是前面 I/O 複用模型介紹的標準網路程式設計API,可以實現應用程式通過一個阻塞物件監聽多路連線請求。
  2. Reactor 物件通過 Select 監控客戶端請求事件,收到事件後通過 Dispatch 進行分發。
  3. 如果是建立連線請求事件,則由 Acceptor 通過 Accept 處理連線請求,然後建立一個 Handler 物件處理完成連線後的各種事件
  4. 如果不是建立連線事件,則 Reactor 會分發呼叫連線對應的 Handler 來響應。
  5. Handler 會完成 Read -> 業務處理 -> Send 的完整業務流程。

優缺點分析

  • 優點:模型簡單,沒有多執行緒、程序通訊、競爭的問題,全部都在一個執行緒中完成。
  • 缺點:
    • 效能問題,只有一個執行緒,無法完全發揮多核 CPU 的效能。Handler 在處理某個連線上的業務時,整個程序無法處理其他連線事件,很容易導致效能瓶頸。
    • 可靠性問題,執行緒意外終止,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障。

使用場景:客戶端的數量有限,業務處理非常快速,比如 Redis 在業務處理的時間複雜度 O(1) 的情況。

單Reactor 多執行緒模式

工作原理示意圖

工作流程說明

  1. Reactor 物件通過 Select 監控客戶端請求事件,收到事件後,通過 dispatch 進行分發。
  2. 如果建立連線請求, 則由 Acceptor 通過 accept 處理連線請求,然後建立一個 Handler 物件處理完成連線後的各種事件。
  3. 如果不是連線請求,則由 Reactor 分發呼叫連線對應的 Handler 來處理。
  4. Handler 只負責響應事件,不做具體的業務處理,通過 read 讀取資料後,會分發給後面的 Worker 執行緒池的某個執行緒處理業務。
  5. Worker 執行緒池會分配獨立執行緒完成真正的業務,並將結果返回給 Handler。
  6. Handler 收到響應後,通過 send 將結果返回給 client。

優缺點分析

  • 優點:可以充分的利用多核 CPU 的處理能力。
  • 缺點:多執行緒資料共享和訪問比較複雜,Reactor 處理所有的事件的監聽和響應,在單執行緒執行,在高併發場景容易出現效能瓶頸。

主從Reactor 模式

針對單 Reactor 多執行緒模型中,Reactor 在單執行緒中執行,高併發場景下容易成為效能瓶頸,可以讓 Reactor 在 多執行緒中執行。

工作原理示意圖

工作流程說明

  1. Reactor 主執行緒 MainReactor 物件通過 select 監聽連線事件,收到事件後,通過Acceptor 處理連線事件。
  2. 當 Acceptor 處理連線事件後,MainReactor 將連線分配給 SubReactor。
  3. SubReactor 將連線加入到連線佇列進行監聽,並建立 handler 進行各種事件處理。
  4. 當有新事件發生時,SubReactor 就會呼叫對應的 handler 處理。
  5. handler 通過 read 讀取資料,分發給後面的 worker 執行緒處理。
  6. worker 執行緒池分配獨立的 worker 執行緒進行業務處理,並返回結果。
  7. handler 收到響應的結果後,再通過 send 將結果返回給 client。
  8. Reactor 主執行緒可以對應多個 Reactor 子執行緒, 即 MainRecator 可以關聯多個SubReactor。

優缺點分析

  • 優點:
    • 父執行緒與子執行緒的資料互動簡單職責明確,父執行緒只需要接收新連線,子執行緒完成後續的業務處理。
    • 父執行緒與子執行緒的資料互動簡單,Reactor 主執行緒只需要把新連線傳給子執行緒,子執行緒無需返回資料。
  • 缺點:程式設計複雜度較高

結合例項:這種模型在許多專案中廣泛使用,包括 Nginx 主從 Reactor 多程序模型,Memcached 主從多執行緒, Netty 主從多執行緒模型的支援。

Reactor 模式優點和缺點

優點

  1. 響應快,不必為單個同步時間所阻塞,雖然 Reactor 本身依然是同步的。
  2. 可以最大程度的避免複雜的多執行緒及同步問題,並且避免了多執行緒/程序的切換開銷。
  3. 擴充套件性好,可以方便的通過增加 Reactor 例項個數來充分利用 CPU 資源。
  4. 複用性好,Reactor 模型本身與具體事件處理邏輯無關,具有很高的複用性。

缺點

  1. 相比傳統的簡單模型,Reactor增加了一定的複雜性,因而有一定的門檻,並且不易於除錯。
  2. Reactor模式需要底層的Synchronous Event Demultiplexer支援,比如Java中的Selector 支援,作業系統的select系統呼叫支援,如果要自己實現Synchronous Event Demultiplexer 可能不會有那麼高效。
  3. Reactor模式在IO 讀寫資料時還是在同一個執行緒中實現的,即使使用多個Reactor 機制的情況下,那些共享一個Reactor 的Channel 如果出現一個長時間的資料讀寫,會影響這個Reactor 中其他Channel 的響應時間,比如在大檔案傳輸時,IO 操作就會影響其他Client 的響應時間,因而對這種操作,使用傳統的Thread-Per-Connection 或許是一個更好的選擇,或者此時使用改進版的Reactor 模式如Proactor 模式。

Netty 模型

Netty 主要基於主從 Reactors 多執行緒模型做了一定的改進,其中主從 Reactor 多執行緒模型有多個 Reactor。

工作原理示意圖

工作流程說明

  1. Netty 抽象出兩組執行緒池 BossGroup 專門負責接收客戶端的連線,WorkerGroup 專門負責網路的讀寫。
  2. BossGroup 和 WorkerGroup 型別都是 NioEventLoopGroup。
  3. NioEventLoopGroup 相當於一個事件迴圈組,這個組中含有多個事件迴圈,每一個事件迴圈是 NioEventLoop。
  4. NioEventLoop 表示一個不斷迴圈的執行處理任務的執行緒,每個 NioEventLoop 都有一個 selector ,用於監聽繫結在其上的 socket 的網路通訊。其內部採用序列化設計,從訊息的讀取 -> 解碼 -> 處理 -> 編碼 -> 傳送,始終由 IO 執行緒 NioEventLoop 負責。
  5. NioEventLoopGroup 可以有多個執行緒, 即可以含有多個 NioEventLoop。
  6. 每個 Boss NioEventLoop 迴圈執行的步驟:
    • 輪詢 accept 事件。
    • 處理 accept 事件,與 client 建立連線,生成 NioScocketChannel ,並將其註冊到某個 Worker NIOEventLoop 上 的 selector。
    • 處理任務佇列的任務,即 runAllTasks。
  7. 每個 Worker NIOEventLoop 迴圈執行的步驟:
    • 輪詢 read,write 事件。
    • 處理 I/O 事件, 即read ,write 事件,在對應 NioScocketChannel 處理業務。
    • 處理任務佇列的任務,即 runAllTasks。
  8. 每個 Worker NIOEventLoop 處理業務時,會使用pipeline(管道),pipeline 中包含了 channel , 即通過pipeline 可以獲取到對應通道, 管道中維護了很多的handler 處理器用來處理 channel 中的資料。

案例: TCP 服務

  • 服務端
public class NettyServer {

    public static void main(String[] args) {

        // 建立兩個執行緒組 bossGroup 和 workerGroup
        // bossGroup 只是處理連線請求, 真正的和客戶端業務處理, 會交給workerGroup 完成
        // bossGroup 和 workerGroup 含有的子執行緒(NioEventLoop)的個數預設是實際cpu 核心數 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();

        bootstrap.group(bossGroup, workerGroup) // 設定兩個執行緒組
                .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為伺服器的通道實現
                .option(ChannelOption.SO_BACKLOG, 128) // 設定執行緒佇列得到連線個數
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 設定保持活動的連線狀態
                .childHandler(new ChannelInitializer<SocketChannel>() { // 建立一個通道測試物件(匿名物件)
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 給pipeline 設定處理器
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                }); // 給workGroup 的EventLoop 對應的管道設定處理器

        System.out.println("伺服器已準備就緒...");

        try {
            // 啟動伺服器, 繫結埠並設定同步
            ChannelFuture channelFuture = bootstrap.bind(8080).sync();

            // 對關閉通道監聽
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

/**
 * 自定義handler處理器
 */
class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 讀取資料實際(這裡可以讀取客戶端傳送的訊息)
     * @param ctx 上下文物件, 含有管道pipeline, 通道channel, 地址
     * @param msg 客戶端傳送的資料
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("伺服器讀取執行緒: " + Thread.currentThread().getName());
        System.out.println("server ctx = " + ctx);


        Channel channel = ctx.channel();
        // ChannelPipeline pipeline = ctx.pipeline(); // 本質是一個雙向連結串列

        // 將msg 轉成一個ByteBuf
        // ByteBuf是Netty提供的, 不是NIO的ByteBuffer
        ByteBuf buf = (ByteBuf) msg;

        System.out.println("收到客戶端訊息: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址: " + channel.remoteAddress());
    }

    /**
     * 資料讀取完畢
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // writeAndFlush 是write + flush
        // 將資料寫入到快取, 並重新整理
        ctx.writeAndFlush(Unpooled.copiedBuffer("bye ~", CharsetUtil.UTF_8));
    }
}
  • 客戶端
public class NettyClient {
    public static void main(String[] args) {
        // 客戶端需要一個事件迴圈組
        EventLoopGroup group = new NioEventLoopGroup();

        // 建立客戶端啟動物件
        // 注意客戶端使用的不是ServerBootstrap 而是Bootstrap
        Bootstrap bootstrap = new Bootstrap();
        // 設定相關引數
        bootstrap.group(group) //設定執行緒組
                .channel(NioSocketChannel.class) // 設定客戶端通道的實現類(反射)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
                    }
                });

        System.out.println("客戶端準備就緒...");

        try {
            // 啟動客戶端去連線伺服器端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
            // 監聽關閉通道
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 當通道就緒就會觸發該方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client ctx = " + ctx);

        ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~", CharsetUtil.UTF_8));
    }

    /**
     * 當通道有讀取事件時觸發
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("伺服器訊息: " + buf.toString(CharsetUtil.UTF_8));
        System.out.println("伺服器地址: "+ ctx.channel().remoteAddress());
    }

    /**
     * 異常事件
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

任務佇列 Task

使用場景

  1. 使用者程式自定義的普通任務
  2. 使用者自定義定時任務
  3. 非當前 Reactor 執行緒呼叫 Channel 的各種方法

程式碼演示

class NettyServerTaskHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 自定義普通任務, 該任務是提交到taskQueue中
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~ task", CharsetUtil.UTF_8));
                    System.out.println("channel hash =" + ctx.channel().hashCode());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 注意此處第二個普通任務在任務一基礎上睡眠5s, 10s後輸出
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~ task2", CharsetUtil.UTF_8));
                    System.out.println("channel hash =" + ctx.channel().hashCode());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 自定義定時任務, 該任務是提交到scheduledTaskQueue中
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~ timed task", CharsetUtil.UTF_8));
                    System.out.println("channel hash =" + ctx.channel().hashCode());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 5, TimeUnit.SECONDS);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("bye ~", CharsetUtil.UTF_8));
    }
}

非同步模型

基本介紹

非同步的概念和同步相對,當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。實際處理這個呼叫的元件在完成後,通過狀態、通知和回撥來通知呼叫者。

Netty 中的 I/O 操作是非同步的,包括 Bind、Write、Connect 等操作會簡單的返回一個 ChannelFuture,呼叫者並不能立刻獲得結果,而是通過 Future - Listener 機制,使用者可以方便的主動獲取或者通過通知機制獲得 IO 操作結果。Netty 的非同步模型是建立在 future 和 callback 的基礎上。

Future 表示非同步的執行結果, 可以通過它提供的方法來檢測執行是否完成,比如檢索計算等。

工作原理示意圖

說明:

  1. 在使用 Netty 程式設計時,攔截操作和轉換出入站資料只需要提供 callback 或利用 future 即可。這使得鏈式操作簡單、高效,並有利於編寫可重用的、通用的程式碼。
  2. Netty 框架的目標就是讓你的業務邏輯從網路基礎應用編碼中分離出來、解脫出來。

Future-Listener 機制

  1. 當 Future 物件剛剛建立時,處於非完成狀態,呼叫者可以通過返回的 ChannelFuture 來獲取操作執行的狀態,註冊監聽函式來執行完成後的操作。
  2. 常見有如下操作:
    • 通過 isDone 方法來判斷當前操作是否完成
    • 通過 isSuccess 方法來判斷已完成的當前操作是否成功
    • 通過 getCause 方法來獲取已完成的當前操作失敗的原因
    • 通過 isCancelled 方法來判斷已完成的當前操作是否被取消
    • 通過 addListener 方法來註冊監聽器,當操作已完成(isDone 方法返回完成),將會通知指定的監聽器;如果 Future 物件已完成,則通知指定的監聽器

程式碼示例

繫結埠是非同步操作,當繫結操作處理完,將會呼叫相應的監聽器處理邏輯

// 啟動伺服器, 繫結埠並設定同步
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
// 給ChannelFuture註冊監聽器, 監控關心的事件
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            System.out.println("監聽埠 8080 成功");
        } else {
            System.out.println("監聽埠 8080 失敗");
        }
    }
});

案例:HTTP 服務

public class HttpServer {

    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new CustomHttpServerInitializer());

        try {
            ChannelFuture channelFuture = bootstrap.bind(8080).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

class CustomHttpServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // 得到管道
        ChannelPipeline pipeline = ch.pipeline();

        // 加入一個netty提供的http編解碼器
        pipeline.addLast("httpServerCodec", new HttpServerCodec());
        // 增加一個自定義handler
        pipeline.addLast(new CustomHttpServer());
    }
}

class CustomHttpServer extends SimpleChannelInboundHandler<HttpObject> {

    /**
     * 讀取客戶端資料
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {
            System.out.println("pipeline hash = " + ctx.pipeline().hashCode() + "handler hash = " + this.hashCode());

            System.out.println("客戶端地址: " + ctx.channel().remoteAddress());

            // 請求資訊
            HttpRequest httpRequest = (HttpRequest) msg;
            // 獲取URI, 過濾指定資源
            URI uri = new URI(httpRequest.uri());
            if ("/favicon.ico".equals(uri.getPath())) {
                return;
            }

            // 回覆資訊給瀏覽器
            ByteBuf content = Unpooled.copiedBuffer("hello ~", CharsetUtil.UTF_8);

            // 構造http響應, 即httpResponse
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            ctx.writeAndFlush(response);
        }
    }
}