1. 程式人生 > 實用技巧 >三、Netty高效能架構設計

三、Netty高效能架構設計

@

目錄

三、Netty高效能架構設計

1、Netty概述

本文原始碼:

1.1、原生NIO存在的問題

  • NIO 的類庫和 API 繁雜,使用麻煩:需要熟練掌握 SelectorServerSocketChannelSocketChannelByteBuffer 等。
  • 需要具備其他的額外技能:要熟悉 Java 多執行緒程式設計,因為 NIO 程式設計涉及到 Reactor 模式,你必須對多執行緒和網路程式設計非常熟悉,才能編寫出高質量的 NIO 程式。
  • 開發工作量和難度都非常大:例如客戶端面臨斷連重連網路閃斷
    半包讀寫失敗快取網路擁塞和異常流的處理等等。
  • JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。直到 JDK 1.7 版本該問題仍舊存在,沒有被根本解決。

1.2、Netty優點

NettyJDK自帶的NIOAPI進行了封裝,解決了上述問題。

  • 設計優雅:適用於各種傳輸型別的統一 API 阻塞和非阻塞 Socket;基於靈活且可擴充套件的事件模型,可以清晰地分離關注點;高度可定製的執行緒模型 - 單執行緒,一個或多個執行緒池.
  • 安全:完整的 SSL/TLSStartTLS 支援
  • 高效能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的記憶體複製。

2、I/O執行緒模型

  • 目前存在的執行緒模型主要有:
    • 傳統阻塞I/O服務模型
    • Reactor模式
  • 根據Reactor的數量和處理資源池執行緒的數量不同,有如下3種典型的實現
    • Reactor單執行緒
    • Reactor多執行緒
    • 主從Reactor多執行緒
  • Netty執行緒模型主要基於主從Reactor多執行緒模型做了一定的改進,其中主從Reactor多執行緒模型有多個Reactor

2.1、傳統阻塞I/O服務模型

圖解說明:黃色的框表示物件,藍色的框表示執行緒、白色的框表示方法(API)。之後的圖相同。

2.1.1、模型分析

模型特點:

  • 採用阻塞IO模式獲取輸入的資料
  • 每個連結都需要獨立的執行緒完成資料的輸入,業務處理、資料返回。

問題分析:

  • 當併發數很大,就會建立大量的執行緒,佔用很大系統資源
  • 連線建立後,如果當前執行緒暫時沒有資料可讀,該執行緒會阻塞在read操作,造成執行緒資源浪費。

2.1.2、模型實現程式碼示例

由於模型的邏輯主要集中在服務端,所以所有模型程式碼示例基本上都是服務端的示例

public static void main(String[] args) throws IOException {
    //1、建立一個執行緒池
    //2、如果有客戶端連線,就建立一個執行緒,與之通訊(單獨寫一個方法)
    ExecutorService executorService = Executors.newCachedThreadPool();

    //建立ServerSocket
    ServerSocket serverSocket = new ServerSocket(6666);
    System.out.println("伺服器啟動了");

    while (true) {
        //監聽,等待客戶端連線
        final Socket socket = serverSocket.accept();
        System.out.println("連線到一個客戶端");
        //建立一個執行緒,與之通訊
        executorService.execute(() -> {
            //重寫Runnable方法,與客戶端進行通訊
            handler(socket);
        });
    }
}

//編寫一個Handler方法,和客戶端通訊。主要進行資料的讀取和業務處理。
public static void handler(Socket socket) {
    try {
        byte[] bytes = new byte[1024];
        //通過socket獲取輸入流
        InputStream inputStream = socket.getInputStream();
        //迴圈的讀取客戶端傳送的資料
        while (true){
            int read = inputStream.read(bytes);
            if (read != -1){
                System.out.println(new String(bytes, 0, read));//輸出客戶端傳送的資料
            } else {
                break;
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        System.out.println("關閉和client的連線");
        try {
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2.2、Reactor模型概述

針對傳統阻塞I/O服務模型的2個缺點,解決方案如下:

  • 基於 I/O 複用模型:多個連線共用一個阻塞物件,應用程式只需要在一個阻塞物件等待,無需阻塞等待所有連線。當某個連線有新的資料可以處理時,作業系統通知應用程式,執行緒從阻塞狀態返回,開始進行業務處理。Reactor 對應的叫法: 1. 反應器模式 2. 分發者模式(Dispatcher) 3. 通知者模式(notifier)
  • 基於執行緒池複用執行緒資源:不必再為每個連線建立執行緒,將連線完成後的業務處理任務分配給執行緒進行處理,一個執行緒可以處理多個連線的業務。

I/O複用結合線程池,就是Reactor模式基本設計思想,如圖所示:

  • Reactor模式,通過一個或多個輸入同時傳遞給服務處理器的模式(基於事件驅動)
  • 伺服器端程式處理傳入的多個請求,並將它們同步分派到響應的處理執行緒,因此Reactor模式也叫Dispatcher模式。
  • Reactor模式使用IO複用監聽事件,收到事件後,分發的某個執行緒(程序),這點就是網路服務高併發處理的關鍵。

Reactor模式中的核心組成部分:

在一個單獨的執行緒中執行,負責監聽和分發事件,分發給適當的處理程式來對
事件作出反應。

  • 我的理解是將Reactor理解成一個Selector,它可以對建立新的連線,也可以將產生的讀寫事件交換給Handler進行處理

  • Handlers:處理程式執行I/O事件要完成的實際事件,類似於客戶想要與之交談的公司中的實際官員。Reactor通過排程適當的處理程式來響應I/O事件,處理程式執行非阻塞操作。

2.3、單Reactor單執行緒模式

類似於nio中的Selector(選擇器) 根據事件驅動 處理不同的連線請求

方案說明:

  • Select 是前面 I/O 複用模型介紹的標準網路程式設計 API,可以實現應用程式通過一個阻塞物件監聽多路連線請求
  • Reactor 物件通過 Select 監控客戶端請求事件,收到事件後通過 Dispatch 進行分發
  • 如果是建立連線請求事件,則由 Acceptor 通過 Accept 處理連線請求,然後建立一個 Handler 物件處理連線完成後的後續業務處理
  • 如果不是建立連線事件,則 Reactor 會分發呼叫連線對應的 Handler 來響應
  • Handler 會完成 Read→業務處理→Send 的完整業務流程

結合例項:伺服器端用一個執行緒通過多路複用搞定所有的 IO 操作(包括連線,讀、寫等),編碼簡單,清晰明瞭,但是如果客戶端連線數量較多,將無法支撐,前面的 NIO 案例就屬於這種模型。

2.2.1、模型分析

  • 優點:模型簡單,沒有多執行緒、程序通訊、競爭的問題,全部都在一個執行緒中完成
  • 缺點:效能問題,只有一個執行緒,無法完全發揮多核 CPU 的效能。Handler 在處理某個連線上的業務時,整個程序無法處理其他連線事件,很容易導致效能瓶頸
  • 缺點:可靠性問題,執行緒意外終止,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障
  • 使用場景:客戶端的數量有限,業務處理非常快速,比如 Redis在業務處理的時間複雜度 O(1) 的情況

2.2.2、模型實現程式碼示例

這裡面我為了簡便,我將Reactor和Acceptor和Handler三個物件搞成了方法。

public class SReactorSThread {
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private int PORT = 6666;

    public SReactorSThread() {
        try {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(PORT));
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //對客戶端進行監聽
    public void listen() {
        try {
            while (true) {
                int count = selector.select();
                //表示有客戶端產生事件
                if (count > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();//取出產生事件的Channel
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();//準備對其進行遍歷
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        //將key交給dispatch去處理
                        dispatch(key);
                        iterator.remove();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //dispatch
    private void dispatch(SelectionKey key) {
        if (key.isAcceptable()){
            accept(key);
        }else {
            handler(key);
        }
    }

    //建立新的連線
    private void accept(SelectionKey key) {
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //對請求進行處理,接收訊息---業務處理---返回訊息
    private void handler(SelectionKey key) {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(3);
            StringBuilder msg = new StringBuilder();
            while (channel.read(buffer) > 0) {
                msg.append(new String(buffer.array()));
                buffer.clear();
            }
            System.out.println("接收到訊息:" + msg.toString());
            //傳送訊息
            String ok = "OK";
            buffer.put(ok.getBytes());
            //這個flip非常重要哦,是將position置0,limit置於position的位置,以便下面程式碼進行寫入操作能夠正確寫入buffer中的所有資料
            buffer.flip();  
            channel.write(buffer);
            buffer.clear();
        } catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress() + "離線了");
                //取消該通道的註冊並關閉通道,這裡非常重要,沒有這一步的話當客戶端斷開連線就會不斷丟擲IOException
                //是因為,select會一直產生該事件。
                key.cancel();
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

/********呼叫**************/

public static void main(String[] args) {
    SReactorSThread sReactorSThread = new SReactorSThread();
    sReactorSThread.listen();
}

這裡有更牛逼更完整的Reactor單執行緒模型的程式碼案例https://www.cnblogs.com/hama1993/p/10611229.html
https://www.cnblogs.com/eason-ou/p/11910484.html

2.4、單Reactor多執行緒模型

方案說明:

  • Reactor 物件通過select 監控客戶端請求事件, 收到事件後,通過dispatch進行分發
  • 如果建立連線請求, 則右Acceptor 通過accept 處理連線請求, 然後建立一個Handler物件處理完成連線後的各種事件
  • 如果不是連線請求,則由reactor分發呼叫連線對應的handler 來處理
  • handler 只負責響應事件,不做具體的業務處理, 通過read 讀取資料後,會分發給後面的worker執行緒池的某個執行緒處理業務
  • worker 執行緒池會分配獨立執行緒完成真正的業務,並將結果返回給handler
  • handler收到響應後,通過send 將結果返回給client

2.4.1、模型分析

  • 優點:可以充分的利用多核cpu 的處理能力
  • 缺點:多執行緒資料共享和訪問比較複雜, reactor 處理所有的事件的監聽和響應,在單執行緒執行, 在高併發場景容易出現效能瓶頸.

2.4.2、模型實現程式碼示例

參考 https://www.cnblogs.com/eason-ou/p/11910523.html
也就是每次讀寫操作利用執行緒池開個執行緒來進行讀寫操作 主執行緒繼續監聽事件
不知道為什麼要

  • 在寫完成之後切換為讀事件
  • 在讀事件之後切換為寫事件
    //讀完後,將通道註冊為寫
    Selector selector = selectionKey.selector();
    SelectionKey sk = sc.register(selector, SelectionKey.OP_WRITE);

上述問題 的原因在於sun.nio.ch.SelectionKeyImpl#nioInterestOps(int ops)方法

  • 可以看到直接覆蓋了之前設定的interestOps,也就是說我們在客戶端完成連線,設定SelectionKey.OP_READ事件,其READ實已經把之前的事件全部覆蓋了。

問題又來了

  • 為什麼不在連線事件觸發 並且連線成功的時候註冊
    SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
    直接註冊兩個事件呢? -> SelectionKey.OP_READ | SelectionKey.OP_WRITE

最終解決問題:

2.5、主從Reactor多執行緒

方案說明:

  • Reactor主執行緒 MainReactor 物件就只註冊一個用於監聽連線請求的ServerSocketChannel,通過select 監聽連線事件, 收到事件後,通過Acceptor 處理連線事件
  • Acceptor 處理連線事件後,MainReactor 通過accept獲取新的連線,並將連線註冊到SubReactor
  • subreactor 將連線加入到連線佇列進行監聽,並建立handler進行各種事件處理
  • 當有新事件發生時, subreactor 就會呼叫對應的handler處理
  • handler 通過read 讀取資料,分發給後面的worker 執行緒處理
  • worker 執行緒池分配獨立的worker 執行緒進行業務處理,並返回結果
  • handler 收到響應的結果後,再通過send 將結果返回給client
  • Reactor 主執行緒可以對應多個Reactor 子執行緒, 即MainRecator 可以關聯多個SubReactor

2.5.1、模型分析

  • 優點:父執行緒與子執行緒的資料互動簡單職責明確,父執行緒只需要接收新連線,子執行緒完成後續的業務處理。
  • 優點:父執行緒與子執行緒的資料互動簡單,Reactor 主執行緒只需要把新連線傳給子執行緒,子執行緒無需返回資料
  • 缺點:程式設計複雜度較高
  • 結合例項:這種模型在許多專案中廣泛使用,包括 Nginx 主從 Reactor 多程序模型,Memcached 主從多執行緒,Netty 主從多執行緒模型的支援

2.5.2、模型實現程式碼示例

還是去看別人大佬寫的吧。https://www.cnblogs.com/eason-ou/p/11912010.html

3種模式用生活案例來理解

  • 1)單Reactor單執行緒,前臺接待員和服務員是同一個人,全程為顧客服
  • 2)單Reactor多執行緒,1個前臺接待員,多個服務員,接待員只負責接待
  • 3)主從Reactor多執行緒,多個前臺接待員,多個服務生

3、Netty模型

3.1、主從Reactor進階

Netty主要是基於主從Reactor多執行緒模式做了一定的改進,其中主從Reactor都有單一的一個變成了多個。下面是簡單的改進圖。

  • 如圖所示,增加了BossGroup來維護多個主Reactor,主Reactor還是隻關注連線的Accept;增加了WorkGroup來維護多個從Reactor,從Reactor將接收到的請求交給Handler進行處理。
  • 在主Reactor中接收到Accept事件,獲取到對應的SocketChannelNetty會將它進一步封裝成NIOSocketChannel物件,這個封裝後的物件還包含了該Channel對應的SelectionKey、通訊地址等詳細資訊
  • Netty會將裝個封裝後的Channel物件註冊到WorkerGroup中的從Reactor中。
  • WorkerGroup中的從Reactor監聽到事件後,就會將之交給與此Reactor對應的Handler進行處理。

3.2、再進階版

  • NettySelector以及Selector相關的事件及任務封裝了NioEventLoop,這樣BossGroup就可以通過管理NioEventLoop去管理各個Selector
  • 同時,Netty模型中主要存在兩個大的執行緒池組BossGroupWorkerGroup,用於管理主Reactor執行緒和從Reactor執行緒。

3.3、Netty模型

  • Netty抽象出兩組執行緒池,BossGroup專門負責接收客戶端的連線,WorkerGroup專門負責網路的讀寫

  • BossGroupWorkerGroup型別的本質都是NioEventLoopGroup型別。

  • NioEventLoopGroup相當於一個執行緒管理器(類似於ExecutorServevice),它下面維護很多個NioEventLoop執行緒。(我認為圖中的NioEventGroup的地方應該改成NioEventLoop,可能我的理解有點差錯吧)

    • 在初始化這兩個Group執行緒組時,預設會在每個Group中生成CPU*2NioEventLoop執行緒
    • n個連線來了,Group預設會按照連線請求的順序分別將這些連線分給各個NioEventLoop去處理。
    • 同時Group還負責管理EventLoop的生命週期。
  • NioEventLoop表示一個不斷迴圈的執行處理任務的執行緒

    • 它維護了一個執行緒和任務佇列。
    • 每個NioEventLoop都包含一個Selector,用於監聽繫結在它上面的socket通訊。
    • 每個NioEventLoop相當於Selector,負責處理多個Channel上的事件
    • 每增加一個請求連線,NioEventLoopGroup就將這個請求依次分發給它下面的NioEventLoop處理。
  • 每個Boss NioEventLoop迴圈執行的步驟有3步:

    • 輪詢accept事件
    • 處理accept事件,與client建立連線,生成NioSocketChannel,並將其註冊到某個Worker NioEventLoopselector上。
    • 處理任務佇列到任務,即runAllTasks
  • 每個Worker NioEventLoop迴圈執行的步驟:

    • 輪詢readwrite事件
    • 處理I/O事件,即readwrite事件,在對應的NioSocketChannel中進行處理
    • 處理任務佇列的任務,即runAllTasks
  • 每個 Worker NioEventLoop處理業務時,會使用pipeline(管道),pipeline中維護了一個ChannelHandlerContext連結串列,而ChannelHandlerContext則儲存了Channel相關的所有上下文資訊,同時關聯一個ChannelHandler物件。如圖所示,Channelpipeline一一對應,ChannelHandler和ChannelHandlerContext一一對應。

  • ChannelHandler是一個介面,負責處理或攔截I/O操作,並將其轉發到Pipeline中的下一個處理Handler進行處理。

                                                     I/O Request
                                                via Channel or
                                            ChannelHandlerContext
                                                          |
      +---------------------------------------------------+---------------+
      |                           ChannelPipeline         |               |
      |                                                  \|/              |
      |    +---------------------+            +-----------+----------+    |
      |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  .               |
      |               .                                   .               |
      | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
      |        [ method call]                       [method call]         |
      |               .                                   .               |
      |               .                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      +---------------+-----------------------------------+---------------+
                      |                                  \|/
      +---------------+-----------------------------------+---------------+
      |               |                                   |               |
      |       [ Socket.read() ]                    [ Socket.write() ]     |
      |                                                                   |
      |  Netty Internal I/O Threads (Transport Implementation)            |
      +-------------------------------------------------------------------+
    
    

3.4、程式碼示例

服務端程式碼:

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //建立BossGroup 和 WorkerGroup
        //1、建立兩個執行緒組,bossGroup 和 workerGroup
        //2、bossGroup 只是處理連線請求,真正的和客戶端業務處理,會交給 workerGroup 完成
        //3、兩個都是無限迴圈
        //4、bossGroup 和 workerGroup 含有的子執行緒(NioEventLoop)個數為實際 cpu 核數 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup worderGroup = new NioEventLoopGroup();

        try {
            //建立伺服器端的啟動物件,配置引數
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用鏈式程式設計來進行設定,配置
            bootstrap.group(bossGroup, worderGroup) //設定兩個執行緒組
                    .channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作為伺服器的通道實現
                    .option(ChannelOption.SO_BACKLOG, 128) //設定執行緒佇列得到連線個數
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //設定保持活動連線狀態
                    .childHandler(new ChannelInitializer<SocketChannel>() { //為accept channel的pipeline預新增的handler
                        //給 pipeline 新增處理器,每當有連線accept時,就會執行到此處。
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    }); //給我們的 workerGroup 的 EventLoop 對應的管道設定處理器

            System.out.println("........伺服器 is ready...");
            //繫結一個埠並且同步,生成了一個ChannelFuture 物件
            //啟動伺服器(並繫結埠)
            ChannelFuture future = bootstrap.bind(6668).sync();

            //對關閉通道進行監聽
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            worderGroup.shutdownGracefully();
        }
    }
}

服務端Handler處理:

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     *讀取客戶端傳送過來的訊息
     * @param ctx 上下文物件,含有 管道pipeline,通道channel,地址
     * @param msg 就是客戶端傳送的資料,預設Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("伺服器讀取執行緒:" + Thread.currentThread().getName());
        System.out.println("server ctx = " + ctx);
        //看看Channel和Pipeline的關係
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); //本質是個雙向連結串列,出棧入棧

        //將msg轉成一個ByteBuf,比NIO的ByteBuffer效能更高
        ByteBuf buf = (ByteBuf)msg;
        System.out.println("客戶端傳送的訊息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
    }

    //資料讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //它是 write + flush,將資料寫入到快取buffer,並將buffer中的資料flush進通道
        //一般講,我們對這個傳送的資料進行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~", CharsetUtil.UTF_8));
    }

    //處理異常,一般是關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客戶端:

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客戶端需要一個事件迴圈組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //建立客戶端啟動物件
            //注意:客戶端使用的不是 ServerBootStrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //設定相關引數
            bootstrap.group(group) //設定執行緒組
                    .channel(NioSocketChannel.class) //設定客戶端通道的實現類(使用反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
                        }
                    });
            System.out.println("客戶端 OK...");

            //啟動客戶端去連線伺服器端
            //關於 channelFuture 涉及到 netty 的非同步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //給關閉通道進行監聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客戶端Handler處理:

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    /**
     * 當通道就緒就會觸發
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client: " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
    }

    /**
     * 當通道有讀取事件時,會觸發
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        System.out.println("伺服器回覆的訊息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("伺服器的地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

3.5、任務佇列

任務佇列由NioEventLoop維護並不斷執行。當我們就收到請求之後,在當前channel對應的pipeline中的各個Handler裡面進行業務處理和請求過濾。當某些業務需要耗費大量時間的時候,我們可以將任務提交到由NioEventLoop維護的taskQueuescheduleTaskQueue中,讓當前的NioEventLoop執行緒在空閒時間去執行這些任務。下面將介紹提交任務的3種方式

使用者程式自定義的普通任務:

該方式會將任務提交到taskQueue佇列中。提交到該佇列中的任務會按照提交順序依次執行
例如: 提交兩個任務到佇列中 第一個任務消耗10秒 第二個任務消耗20秒

  • 先執行第一個任務用時10秒 -->0-10秒第一個任務
  • 第一個任務完成後才執行第二個 -->10秒-30秒第二個任務
  • 總共耗時30秒
channelHandlerContext.channel().eventLoop().execute(new Runnable(){
    @Override
    public void run() {
        //...
    }
});

使用者自定義的定時任務:

該方式會將任務提交到scheduleTaskQueue定時任務佇列中。該佇列是底層是優先佇列PriorityQueue實現的,固該佇列中的任務會按照時間的先後順序定時執行。

channelHandlerContext.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {

    }
}, 60, TimeUnit.SECONDS);

為其他EventLoop執行緒對應的Channel新增任務
可以在ChannelInitializer中,將剛建立的各個Channel以及對應的標識加入到統一的集合中去,然後可以根據標識獲取Channel以及其對應的NioEventLoop,然後就課程呼叫execute()或者schedule()方法。

3.6、非同步模型

基本介紹

  • 非同步的概念和同步相對。當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。實際處理這個呼叫的元件在完成後,通過狀態、通知和回撥來通知呼叫者。
  • Netty 中的 I/O 操作是非同步的,包括 BindWriteConnect 等操作會簡單的返回一個 ChannelFuture
  • 呼叫者並不能立刻獲得結果,而是通過 Future-Listener 機制,使用者可以方便的主動獲取或者通過通知機制獲得 IO 操作結果
  • Netty 的非同步模型是建立在 futurecallback 的之上的。callback 就是回撥。重點說 Future,它的核心思想是:假設一個方法 fun,計算過程可能非常耗時,等待 fun返回顯然不合適。那麼可以在呼叫 fun 的時候,立馬返回一個 Future,後續可以通過 Future去監控方法 fun 的處理過程(即 : Future-Listener 機制)

Future說明

  • 表示非同步的執行結果, 可以通過它提供的方法來檢測執行是否完成,比如檢索計算等等.
  • ChannelFuture 是一個介面 : public interface ChannelFuture extends Future。我們可以新增監聽器,當監聽的事件發生時,就會通知到監聽器

工作原理示意圖

  • 在使用 Netty 進行程式設計時,攔截操作和轉換出入站資料只需要您提供 callback 或利用future 即可。這使得鏈式操作簡單、高效, 並有利於編寫可重用的、通用的程式碼。
  • Netty 框架的目標就是讓你的業務邏輯從網路基礎應用編碼中分離出來、解脫出來

Future-Listener機制

  • Future 物件剛剛建立時,處於非完成狀態,呼叫者可以通過返回的 ChannelFuture 來獲取操作執行的狀態,註冊監聽函式來執行完成後的操作。

  • 常用方法如下:

    方法名稱 方法作用
    isDone() 判斷當前操作是否完成
    isSuccess() 判斷已完成的當前操作是否成功
    getCause() 獲取已完成當前操作失敗的原因
    isCancelled() 判斷已完成的當前操作是否被取消
    addListener() 註冊監聽器,當前操作(Future)已完成,將會通知指定的監聽器

舉例說明
繫結埠操作時非同步操作,當繫結操作處理完,將會呼叫相應的監聽器處理邏輯。

serverBootstrap.bind(port).addListener(future -> {
       if(future.isSuccess()) {
           System.out.println(newDate() + ": 埠["+ port + "]繫結成功!");
       } else{
           System.err.println("埠["+ port + "]繫結失敗!");
       }
   });

3.7、快速入門例項-HTTP HelloWorld

瀏覽器訪問Netty伺服器後,返回HelloWorld

  1. 每個瀏覽器的pipeline和Handler是獨立的不是共享的
  2. 同一個瀏覽器關閉後 再開啟 也是獨立的不是共享的
  3. HTTP協議用完就斷掉 ,TCP協議要保持連線
  4. 課上演示重新整理時 也會是新的pipeline和Handler ,自己測試發現沒有更換 不知原因

啟動

public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    NioEventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new TestServerInitializer());
        ChannelFuture channelFuture = bootstrap.bind(8080).sync();
        channelFuture.channel().closeFuture().sync();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

定義ChannelInitializer

用於給Channel對應的pipeline新增handler。該ChannelInitializer中的程式碼在SocketChannel被建立時都會執行

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //向管道加入處理器

        //得到管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        //加入一個 netty 提供的 httpServerCodec:codec => [coder - decoder]
        //1、HttpServerCodec 是 netty 提供的處理http的編解碼器
        pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
        //2、增加自定義的Handler
        pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
    }
}

自定義Handler

public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    /**
     * 讀取客戶端資料。
     *
     * @param channelHandlerContext
     * @param httpObject            客戶端和伺服器端互相通訊的資料被封裝成 HttpObject
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {

        //判斷 msg 是不是 HTTPRequest 請求
        if (httpObject instanceof HttpRequest) {
            System.out.println("msg 型別 = " + httpObject.getClass());
            System.out.println("客戶端地址:" + channelHandlerContext.channel().remoteAddress());
            //獲取
            HttpRequest request = (HttpRequest) httpObject;
            //獲取uri,進行路徑過濾
            URI uri = new URI(request.uri());
            if ("/favicon.ico".equals(uri.getPath())) {
                System.out.println("請求了 favicon.ico,不做響應");
            }

            //回覆資訊給瀏覽器[http協議]
            ByteBuf content = Unpooled.copiedBuffer("HelloWorld", CharsetUtil.UTF_8);
            //構造一個http的響應,即HTTPResponse
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            //將構建好的 response 返回
            channelHandlerContext.writeAndFlush(response);
        }
    }
}

原文連線:https://blog.csdn.net/qq_35751014/article/details/104443715
本文在原文的基礎上新增一下自己的理解和筆記