1. 程式人生 > >Netty之旅二:口口相傳的高效能Netty到底是什麼?

Netty之旅二:口口相傳的高效能Netty到底是什麼?

![d0iosx.png](https://upload-images.jianshu.io/upload_images/5660078-5cc3f85b621388a8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 高清思維導圖原件(`xmind/pdf/jpg`)可以關注公眾號:`一枝花算不算浪漫` 回覆`netty01`即可。 ## 前言 上一篇文章講了`NIO`相關的知識點,相比於傳統`IO`,`NIO`已經做得很優雅了,為什麼我們還要使用`Netty`? 上篇文章最後留了很多坑,講了`NIO`使用的弊端,也是為了引出`Netty`而設立的,這篇文章我們就來好好揭開`Netty`的神祕面紗。 本篇文章的目的很簡單,希望看過後你能看懂`Netty`的示例程式碼,針對於簡單的網路通訊,自己也能用`Netty`手寫一個開發應用出來! ## 一個簡單的Netty示例 以下是一個簡單聊天室Server端的程式,程式碼參考自:`http://www.imooc.com/read/82/article/2166` 程式碼有點長,主要核心程式碼是在`main()`方法中,這裡程式碼也希望大家看懂,後面也會一步步剖析。 PS:我是用`mac`系統,直接在終端輸入`telnet 127.0.0.1 8007` 即可啟動一個聊天框,如果提示找不到`telnet`命令,可以通過`brew`進行安裝,具體步驟請自行百度。 ```java /** * @Description netty簡易聊天室 * * @Author 一枝花算不算浪漫 * @Date 2020/8/10 6:52 上午 */ public final class NettyChatServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // 1. EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 2. 服務端引導器 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 3. 設定線bootStrap資訊 serverBootstrap.group(bossGroup, workerGroup) // 4. 設定ServerSocketChannel的型別 .channel(NioServerSocketChannel.class) // 5. 設定引數 .option(ChannelOption.SO_BACKLOG, 100) // 6. 設定ServerSocketChannel對應的Handler,只能設定一個 .handler(new LoggingHandler(LogLevel.INFO)) // 7. 設定SocketChannel對應的Handler .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // 可以新增多個子Handler p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } }); // 8. 繫結埠 ChannelFuture f = serverBootstrap.bind(PORT).sync(); // 9. 等待服務端監聽埠關閉,這裡會阻塞主執行緒 f.channel().closeFuture().sync(); } finally { // 10. 優雅地關閉兩個執行緒池 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class ChatNettyHandler extends SimpleChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("one conn active: " + ctx.channel()); // channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的 ChatHolder.join((SocketChannel) ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String content = new String(bytes, StandardCharsets.UTF_8); System.out.println(content); if (content.equals("quit\r\n")) { ctx.channel().close(); } else { ChatHolder.propagate((SocketChannel) ctx.channel(), content); } } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("one conn inactive: " + ctx.channel()); ChatHolder.quit((SocketChannel) ctx.channel()); } } private static class ChatHolder { static final Map USER_MAP = new ConcurrentHashMap<>(); /** * 加入群聊 */ static void join(SocketChannel socketChannel) { // 有人加入就給他分配一個id String userId = "使用者"+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); send(socketChannel, "您的id為:" + userId + "\n\r"); for (SocketChannel channel : USER_MAP.keySet()) { send(channel, userId + " 加入了群聊" + "\n\r"); } // 將當前使用者加入到map中 USER_MAP.put(socketChannel, userId); } /** * 退出群聊 */ static void quit(SocketChannel socketChannel) { String userId = USER_MAP.get(socketChannel); send(socketChannel, "您退出了群聊" + "\n\r"); USER_MAP.remove(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + " 退出了群聊" + "\n\r"); } } } /** * 擴散說話的內容 */ public static void propagate(SocketChannel socketChannel, String content) { String userId = USER_MAP.get(socketChannel); for (SocketChannel channel : USER_MAP.keySet()) { if (channel != socketChannel) { send(channel, userId + ": " + content); } } } /** * 傳送訊息 */ static void send(SocketChannel socketChannel, String msg) { try { ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length); writeBuffer.writeCharSequence(msg, Charset.defaultCharset()); socketChannel.writeAndFlush(writeBuffer); } catch (Exception e) { e.printStackTrace(); } } } } ``` ![dkeb0s.png](https://upload-images.jianshu.io/upload_images/5660078-dcb49953adad8127.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 程式碼有點長,執行完的效果如上圖所示,下面所有內容都是圍繞著`如何看懂`以及`如何寫出`這樣的程式碼來展開的,希望你看完 也能輕鬆手寫`Netty`服務端程式碼~。通過簡單demo開發讓大家體驗了`Netty`實現相比`NIO`確實要簡單的多,但優點不限於此,只需要知道選擇Netty就對了。 ## Netty核心元件 對應著文章開頭的思維導圖,我們知道`Netty`的核心元件主要有: - Bootstrap && ServerBootstrap - EventLoopGroup - EventLoop - ByteBuf - Channel - ChannelHandler - ChannelFuture - ChannelPipeline - ChannelHandlerContext 類圖如下: ![dk8ZC9.png](https://upload-images.jianshu.io/upload_images/5660078-dcf6d0f8af600df7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) ### Bootstrap & ServerBootstrap 一看到`BootStrap`大家就應該想到**啟動類、引導類**這樣的詞彙,之前分析過[EurekaServer專案啟動類時][1]介紹過`EurekaBootstrap`, 他的作用就是上下文初始化、配置初始化。 在`Netty`中我們也有類似的類,`Bootstrap`和`ServerBootstrap`它們都是`Netty`程式的引導類,主要用於配置各種引數,並啟動整個`Netty`服務,我們看下文章開頭的示例程式碼: ```java ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } }); ``` `Bootstrap`和`ServerBootstrap`是針對於`Client`和`Server`端定義的兩套啟動類,區別如下: - `Bootstrap`是客戶端引導類,而`ServerBootstrap`是服務端引導類。 - `Bootstrap`通常使用`connect()`方法連線到遠端的主機和埠,作為一個`TCP客戶端`。 - `ServerBootstrap`通常使用`bind()`方法繫結本地的埠,等待客戶端來連線。 - `ServerBootstrap`可以處理`Accept`事件,這裡面`childHandler`是用來處理`Channel`請求的,我們可以檢視`chaildHandler()`方法的註解: ![dk884H.png](https://s1.ax1x.com/2020/08/15/dk884H.png) - `Bootstrap`客戶端引導只需要一個`EventLoopGroup`,但是一個`ServerBootstrap`通常需要兩個(上面的`boosGroup`和`workerGroup`)。 ### EventLoopGroup && EventLoop `EventLoopGroup`及`EventLoop`這兩個類名稱定義的很奇怪,對於初學者來說往往無法通過名稱來了解其中的含義,包括我也是這樣。 `EventLoopGroup` 可以理解為一個執行緒池,對於服務端程式,我們一般會繫結兩個執行緒池,一個用於處理 `Accept` 事件,一個用於處理讀寫事件,看下`EventLoop`系列的類目錄: ![dU4Roj.png](https://upload-images.jianshu.io/upload_images/5660078-20c7c39cb233e867.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 通過上面的類圖,我們才恍然大悟,我的親孃咧,這不就是一個執行緒池嘛?(名字氣的犄角拐彎的真是難認) `EventLoopGroup`是`EventLoop`的集合,一個`EventLoopGroup` 包含一個或者多個`EventLoop`。我們可以將`EventLoop`看做`EventLoopGroup`執行緒池中的一個個工作執行緒。 至於這裡為什麼要用到兩個執行緒池,具體的其實可以參考`Reactor`設計模式,這裡暫時不做過多的講解。 - 一個 EventLoopGroup 包含一個或多個 EventLoop ,即 EventLoopGroup : EventLoop = 1 : n - 一個 EventLoop 在它的生命週期內,只能與一個 Thread 繫結,即 EventLoop : Thread = 1 : 1 - 所有有 EventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理,從而保證執行緒安全,即 Thread : EventLoop = 1 : 1 - 一個 Channel 在它的生命週期內只能註冊到一個 EventLoop 上,即 Channel : EventLoop = n : 1 - 一個 EventLoop 可被分配至一個或多個 Channel ,即 EventLoop : Channel = 1 : n 當一個連線到達時,`Netty` 就會建立一個 `Channel`,然後從 `EventLoopGroup` 中分配一個 `EventLoop` 來給這個 `Channel` 繫結上,在該 `Channel` 的整個生命週期中都是有這個繫結的 `EventLoop` 來服務的。 ### ByteBuf 在`Java NIO`中我們有 `ByteBuffer`緩衝池,對於它的操作我們應該印象深刻,往`Buffer`中寫資料時我們需要關注寫入的位置,切換成讀模式時我們還要切換讀寫狀態,不然將會出現大問題。 針對於`NIO`中超級難用的`Buffer`類, `Netty` 提供了`ByteBuf`來替代。`ByteBuf`聲明瞭兩個指標:一個讀指標,一個寫指標,使得讀寫操作進行分離,簡化`buffer`的操作流程。 ![dkQocV.png](https://upload-images.jianshu.io/upload_images/5660078-b10ced8238b3b66e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 另外`Netty`提供了發幾種`ByteBuf`的實現以供我們選擇,`ByteBuf`可以分為: - `Pooled`和`Unpooled` 池化和非池化 - Heap 和 Direct,堆記憶體和堆外記憶體,NIO中建立Buffer也可以指定 - Safe 和 Unsafe,安全和非安全 ![dkJ9TU.png](https://upload-images.jianshu.io/upload_images/5660078-fed9e32fd4a0cdf2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 對於這麼多種建立`Buffer`的方式該怎麼選擇呢?`Netty`也為我們處理好了,我們可以直接使用(真是暖男`Ntetty`): ```java ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; ByteBuf buffer = allocator.buffer(length); ``` 使用這種方式,Netty將最大努力的使用池化、Unsafe、對外記憶體的方式為我們建立buffer。 ### Channel 提起`Channel`並不陌生,上一篇講`NIO`的三大元件提到過,最常見的就是`java.nio.SocketChannel`和`java.nio.ServerSocketChannel`,他們用於非阻塞的I/0操作。類似於`NIO`的`Channel`,Netty提供了自己的`Channel`和其子類實現,用於非同步I/0操作和其他相關的操作。 在 `Netty` 中, `Channel` 是一個 `Socket` 連線的抽象, 它為使用者提供了關於底層 `Socket` 狀態(是否是連線還是斷開) 以及對 `Socket` 的讀寫等操作。每當 `Netty` 建立了一個連線後, 都會有一個對應的 `Channel` 例項。並且,有父子`channel`的概念。 伺服器連線監聽的`channel` ,也叫 `parent channel`。 對應於每一個 `Socket` 連線的`channel`,也叫 `child channel`。 既然`channel` 是 Netty 抽象出來的網路 I/O 讀寫相關的介面,為什麼不使用` JDK NIO` 原生的 `Channel` 而要另起爐灶呢,主要原因如下: - `JDK` 的` SocketChannel` 和 `ServersocketChannel `沒有統一的 `Channel` 介面供業務開發者使用,對一於使用者而言,沒有統一的操作檢視,使用起來並不方便。 - `JDK` 的 `SocketChannel `和 `ScrversockctChannel `的主要職責就是網路 I/O 操作,由於他們是` SPI` 類介面,由具體的虛擬機器廠家來提供,所以通過繼承 SPI 功能直接實現 `ServersocketChannel` 和 `SocketChannel` 來擴充套件其工作量和重新` Channel` 功類是差不多的。 - Netty 的 `ChannelPipeline Channel` 需要夠跟 Netty 的整體架構融合在一起,例如 I/O 模型、基的定製模型,以及基於元資料描述配置化的 TCP 引數等,這些` JDK SocketChannel` 和` ServersocketChannel `都沒有提供,需要重新封裝。 - 自定義的 `Channel` ,功實現更加靈活。 基於上述 4 原因,它的設計原理比較簡單, Netty 重新設計了 `Channel` 介面,並且給予了很多不同的實現。但是功能卻比較繁雜,主要的設計理念如下: - 在 `Channel` 介面層,相關聯的其他操作封裝起來,採用 `Facade` 模式進行統一封裝,將網路 I/O 操作、網路 I/O 統一對外提供。 - `Channel` 介面的定義儘量大而全,統一的檢視,由不同子類實現不同的功能,公共功能在抽象父類中實現,最大程度上實現介面的重用。 - 具體實現採用聚合而非包含的方式,將相關的功類聚合在 `Channel `中,由 `Channel` 統一負責分配和排程,功能實現更加靈活。 `Channel `的實現類非常多,繼承關係複雜,從學習的角度我們抽取最重要的兩個 `NioServerSocketChannel `和 `NioSocketChannel`。 服務端 `NioServerSocketChannel `的繼承關係類圖如下: ![dUn8G4.png](https://upload-images.jianshu.io/upload_images/5660078-a82b11a120a58c85.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 客戶端 `NioSocketChannel `的繼承關係類圖如下: ![dUnJz9.png](https://upload-images.jianshu.io/upload_images/5660078-321c8b4f8e0dda6f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 後面文章原始碼系列會具體分析,這裡就不進一步闡述分析了。 ### ChannelHandler `ChannelHandler` 是`Netty`中最常用的元件。`ChannelHandler` 主要用來處理各種事件,這裡的事件很廣泛,比如可以是連線、資料接收、異常、資料轉換等。 `ChannelHandler` 有兩個核心子類 `ChannelInboundHandler` 和 `ChannelOutboundHandler`,其中 `ChannelInboundHandler` 用於接收、處理入站( `Inbound` )的資料和事件,而 `ChannelOutboundHandler` 則相反,用於接收、處理出站( `Outbound` )的資料和事件。 ![dkJAp9.png](https://upload-images.jianshu.io/upload_images/5660078-b76bf9284af32276.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) #### ChannelInboundHandler `ChannelInboundHandler`處理入站資料以及各種狀態變化,當`Channel`狀態發生改變會呼叫`ChannelInboundHandler`中的一些生命週期方法.這些方法與`Channel`的生命密切相關。 入站資料,就是進入`socket`的資料。下面展示一些該介面的生命週期`API`: ![dUntMR.png](https://upload-images.jianshu.io/upload_images/5660078-016fadc9b5211ee0.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 當某個 `ChannelInboundHandler`的實現重寫 `channelRead()`方法時,它將負責顯式地釋放與池化的 `ByteBuf` 例項相關的記憶體。 Netty 為此提供了一個實用方法`ReferenceCountUtil.release()`。 ```java @Sharable public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } } ``` 這種方式還挺繁瑣的,Netty提供了一個`SimpleChannelInboundHandler`,重寫`channelRead0()`方法,就可以在呼叫過程中會自動釋放資源. ```java public class SimpleDiscardHandler extends SimpleChannelInboundHandler { @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // 不用呼叫ReferenceCountUtil.release(msg)也會釋放資源 } } ``` #### ChannelOutboundHandler 出站操作和資料將由 `ChannelOutboundHandler` 處理。它的方法將被 `Channel`、 `ChannelPipeline `以及 `ChannelHandlerContext` 呼叫。 `ChannelOutboundHandler` 的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些複雜的方法來處理請求。例如, 如果到遠端節點的寫入被暫停了, 那麼你可以推遲沖刷操作並在稍後繼續。 ![d0PxbT.png](https://upload-images.jianshu.io/upload_images/5660078-fe7e906d4f138340.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) `ChannelPromise`與`ChannelFuture`: `ChannelOutboundHandler`中的大部分方法都需要一個`ChannelPromise`引數, 以便在操作完成時得到通知。 `ChannelPromise`是`ChannelFuture`的一個子類,其定義了一些可寫的方法,如`setSuccess()`和`setFailure()`,從而使`ChannelFuture`不可變。 #### ChannelHandlerAdapter `ChannelHandlerAdapter`顧名思義,就是`handler`的介面卡。你需要知道什麼是介面卡模式,假設有一個A介面,我們需要A的`subclass`實現功能,但是B類中正好有我們需要的功能,不想複製貼上B中的方法和屬性了,那麼可以寫一個介面卡類`Adpter`繼承B實現A,這樣一來`Adapter`是A的子類並且能直接使用B中的方法,這種模式就是介面卡模式。 就比如Netty中的`SslHandler`類,想使用`ByteToMessageDecoder`中的方法進行解碼,但是必須是`ChannelHandler`子類物件才能加入到`ChannelPipeline`中,通過如下簽名和其實現細節(`SslHandler`實現細節就不貼了)就能夠作為一個`handler`去處理訊息了。 ```java public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler ``` `ChannelHandlerAdapter`提供了一些實用方法`isSharable()`如果其對應的實現被標註為` Sharable`, 那麼這個方法將返回 `true`, 表示它可以被新增到多個 `ChannelPipeline`中 。如果想在自己的`ChannelHandler`中使用這些介面卡類,只需要擴充套件他們,重寫那些想要自定義的方法即可。 ### ChannelPipeline 每一個新建立的 `Channel` 都將會被分配一個新的 `ChannelPipeline`。這項關聯是永久性的; `Channel` 既不能附加另外一個 `ChannelPipeline`,也不能分離其當前的。在 Netty 元件的生命週期中,這是一項固定的操作,不需要開發人員的任何干預。 Netty 的 `ChannelHandler` 為處理器提供了基本的抽象, 目前你可以認為每個 `ChannelHandler` 的例項都類似於一種為了響應特定事件而被執行的回撥。從應用程式開發人員的角度來看, 它充當了所有處理入站和出站資料的應用程式邏輯的攔截載體。`ChannelPipeline`提供了 `ChannelHandler` 鏈的容器,並定義了用於在該鏈上傳播入站和出站事件流的 `API`。當 `Channel` 被建立時,它會被自動地分配到它專屬的 `ChannelPipeline`。 `ChannelHandler` 安裝到 `ChannelPipeline` 中的過程如下所示: - 一個`ChannelInitializer`的實現被註冊到了`ServerBootstrap`中 - 當 `ChannelInitializer.initChannel()`方法被呼叫時,`ChannelInitializer`將在 `ChannelPipeline `中安裝一組自定義的 `ChannelHandler` - `ChannelInitializer` 將它自己從 `ChannelPipeline `中移除 ![dkJuTO.png](https://upload-images.jianshu.io/upload_images/5660078-6ba27f803a58bd1b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 如上圖所示:這是一個同時具有入站和出站 `ChannelHandler` 的 `ChannelPipeline`的佈局,並且印證了我們之前的關於 `ChannelPipeline `主要由一系列的 `ChannelHandler` 所組成的說法。 `ChannelPipeline`還提供了通過 `ChannelPipeline` 本身傳播事件的方法。如果一個入站事件被觸發,它將被從 `ChannelPipeline`的頭部開始一直被傳播到 Channel Pipeline 的尾端。 你可能會說, 從事件途經 `ChannelPipeline`的角度來看, `ChannelPipeline `的頭部和尾端取決於該事件是入站的還是出站的。然而 Netty 總是將 `ChannelPipeline`的入站口(圖 的左側)作為頭部,而將出站口(該圖的右側)作為尾端。 當你完成了通過呼叫 `ChannelPipeline.add*()`方法將入站處理器( `ChannelInboundHandler`)和 出 站 處 理 器 ( `ChannelOutboundHandler` ) 混 合 添 加 到 `ChannelPipeline`之 後 , 每 一 個`ChannelHandler` 從頭部到尾端的順序位置正如同我們方才所定義它們的一樣。因此,如果你將圖 6-3 中的處理器( `ChannelHandler`)從左到右進行編號,那麼第一個被入站事件看到的 `ChannelHandler` 將是1,而第一個被出站事件看到的 `ChannelHandler `將是 5。 在 `ChannelPipeline` 傳播事件時,它會測試 `ChannelPipeline` 中的下一個 ChannelHandler 的型別是否和事件的運動方向相匹配。如果不匹配, `ChannelPipeline` 將跳過該`ChannelHandler` 並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。 (當然, `ChannelHandler `也可以同時實現`ChannelInboundHandler `介面和 `ChannelOutboundHandler` 介面。) #### 修改`ChannelPipeline` 修改指的是新增或刪除`ChannelHandler`,見程式碼示例: ```java ChannelPipeline pipeline = ..; FirstHandler firstHandler = new FirstHandler(); // 先新增一個Handler到ChannelPipeline中 pipeline.addLast("handler1", firstHandler); // 這個Handler放在了first,意味著放在了handler1之前 pipeline.addFirst("handler2", new SecondHandler()); // 這個Handler被放到了last,意味著在handler1之後 pipeline.addLast("handler3", new ThirdHandler()); ... // 通過名稱刪除 pipeline.remove("handler3"); // 通過物件刪除 pipeline.remove(firstHandler); // 名稱"handler2"替換成名稱"handler4",並切handler2的例項替換成了handler4的例項 pipeline.replace("handler2", "handler4", new ForthHandler()); ``` #### `ChannelPipeline`的出入站`API` 入站`API`所示: [圖片上傳失敗...(image-6037f5-1598167949595)] 出站`API`所示: ![dUndZ6.png](https://upload-images.jianshu.io/upload_images/5660078-93cd407346b35afe.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) `ChannelPipeline` 這個元件上面所講的大致只需要記住這三點即可: - `ChannelPipeline` 儲存了與 `Channel` 相關聯的 `ChannelHandler` - `ChannelPipeline `可以根據需要,通過新增或者刪除 `ChannelHandler` 來動態地修改 - `ChannelPipeline `有著豐富的` API `用以被呼叫,以響應入站和出站事件 ### ChannelHandlerContext 當 `ChannelHandler` 被新增到 `ChannelPipeline` 時,它將會被分配一個 `ChannelHandlerContext` ,它代表了 `ChannelHandler` 和 `ChannelPipeline` 之間的繫結。`ChannelHandlerContext` 的主要功能是管理它所關聯的` ChannelHandler `和在同一個 `ChannelPipeline` 中的其他` ChannelHandler `之間的互動。 如果呼叫`Channel`或`ChannelPipeline`上的方法,會沿著整個`ChannelPipeline`傳播,如果呼叫`ChannelHandlerContext`上的相同方法,則會從對應的當前`ChannelHandler`進行傳播。 `ChannelHandlerContext API`如下表所示: ![dUn0IO.png](https://upload-images.jianshu.io/upload_images/5660078-c08b22d37f156254.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) - `ChannelHandlerContext` 和 `ChannelHandler`之間的關聯(繫結)是永遠不會改變的,所以快取對它的引用是安全的; - 如同在本節開頭所解釋的一樣,相對於其他類的同名方法,`ChannelHandlerContext`的方法將產生更短的事件流, 應該儘可能地利用這個特性來獲得最大的效能。 #### 與`ChannelHandler`、`ChannelPipeline`的關聯使用 ![dUnDiD.png](https://upload-images.jianshu.io/upload_images/5660078-7fb2ed78ee2f7ce6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 從`ChannelHandlerContext`訪問`channel` ```java ChannelHandlerContext ctx = ..; // 獲取channel引用 Channel channel = ctx.channel(); // 通過channel寫入緩衝區 channel.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); ``` 從`ChannelHandlerContext`訪問`ChannelPipeline` ```java ChannelHandlerContext ctx = ..; // 獲取ChannelHandlerContext ChannelPipeline pipeline = ctx.pipeline(); // 通過ChannelPipeline寫入緩衝區 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); ``` ![dUnrJe.png](https://upload-images.jianshu.io/upload_images/5660078-2342628bdea772bf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 有時候我們不想從頭傳遞資料,想跳過幾個`handler`,從某個`handler`開始傳遞資料.我們必須獲取目標`handler`之前的`handler`關聯的`ChannelHandlerContext`。 ```java ChannelHandlerContext ctx = ..; // 直接通過ChannelHandlerContext寫資料,傳送到下一個handler ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); ``` ![dUnyzd.png](https://upload-images.jianshu.io/upload_images/5660078-19b0ce78062e2ddf.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 好了,`ChannelHandlerContext`的基本使用應該掌握了,但是你真的理解`ChannelHandlerContext`,`ChannelPipeline`和`Channelhandler`之間的關係了嗎?不理解也沒關係,因為原始碼以後會幫你理解的更為深刻。 ### 核心元件之間的關係 - 一個 `Channel `對應一個 `ChannelPipeline` - 一個 `ChannelPipeline` 包含一條雙向的 `ChannelHandlerContext `鏈 - 一個 `ChannelHandlerContext `中包含一個` ChannelHandler` - 一個 `Channel `會繫結到一個` EventLoop `上 - 一個 `NioEventLoop` 維護了一個 `Selector(`使用的是 Java 原生的 Selector) - 一個 `NioEventLoop` 相當於一個執行緒 ## 粘包拆包問題 粘包拆包問題是處於網路比較底層的問題,在資料鏈路層、網路層以及傳輸層都有可能發生。我們日常的網路應用開發大都在傳輸層進行,由於`UDP`有訊息保護邊界,不會發生粘包拆包問題,而因此粘包拆包問題只發生在`TCP`協議中。具體講`TCP`是個”流"協議,只有流的概念,沒有包的概念,對於業務上層資料的具體含義和邊界並不瞭解,它只會根據`TCP`緩衝區的實際情況進行包的劃分。所以在業務上認為,一個完整的包可能會被`TCP`拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的`TCP`粘包和拆包問題。 ### 問題舉例說明 下面針對客戶端分別傳送了兩個資料表`Packet1`和`Packet2`給服務端的時候,`TCP`粘包和拆包會出現的情況進行列舉說明: (1)第一種情況,服務端分兩次正常收到兩個獨立資料包,即沒有發生拆包和粘包的現象; ![dUncQA.png](https://upload-images.jianshu.io/upload_images/5660078-4d4679515945c3e1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) (2)第二種情況,接收端只收到一個數據包,由於`TCP`是不會出現丟包的,所以這一個資料包中包含了客戶端傳送的兩個資料包的資訊,這種現象即為粘包。這種情況由於接收端不知道這兩個資料包的界限,所以對於服務接收端來說很難處理。 ![dUn2Lt.png](https://upload-images.jianshu.io/upload_images/5660078-e28e48f4d54a9c17.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) (3)第三種情況,服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的`Packet1`和`Packet2`包的部分內容,第二次讀取到了`Packet2`的剩餘內容,這被稱為TCP拆包; ![d0Pq8s.png](https://upload-images.jianshu.io/upload_images/5660078-462747fa5bac0cff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) (4)第四種情況,服務端分兩次讀取到了兩個資料包,第一次讀取到了部分的`Packet1`內容,第二次讀取到了`Packet1`剩餘內容和`Packet2`的整包。 ![dUn5FS.png](https://upload-images.jianshu.io/upload_images/5660078-839f6b6f14a91b4a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 如果此時服務端TCP接收滑窗非常小,而資料包`Packet1`和`Packet2`比較大,很有可能服務端需要分多次才能將兩個包接收完全,期間發生多次拆包。以上列舉情況的背後原因分別如下: 1. 應用程式寫入的資料大於套接字緩衝區大小,這將會發生拆包。 2. 應用程式寫入資料小於套接字緩衝區大小,網絡卡將應用多次寫入的資料傳送到網路上,這將會發生粘包。 3. 進行`MSS`(最大報文長度)大小的`TCP`分段,當`TCP`報文長度-`TCP`頭部長度>`MSS`的時候將發生拆包。 4. 接收方法不及時讀取套接字緩衝區資料,這將發生粘包。 ### **如何基於Netty處理粘包、拆包問題** 由於底層的`TCP`無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下: 1. 訊息定長,例如每個報文的大小為固定長度200位元組,如果不夠,空位補空格; 2. 在包尾增加回車換行符進行分割,例如`FTP`協議; 3. 將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度的欄位,通常設計思路為訊息頭的第一個欄位使用`int32`來表示訊息的總長度; 4. 更復雜的應用層協議。 之前Netty示例中其實並沒有考慮讀半包問題,這在功能測試往往沒有問題,但是一旦請求數過多或者傳送大報文之後,就會存在該問題。如果程式碼沒有考慮,往往就會出現解碼錯位或者錯誤,導致程式不能正常工作,下面看看Netty是如何根據主流的解決方案進行抽象實現來幫忙解決這一問題的。 如下表所示,Netty為了找出訊息的邊界,採用封幀方式: | 方式 | 解碼 | 編碼 | | ------------------ | ------------------------------ | ---------------------- | | 固定長度 | `FixedLengthFrameDecoder` | 簡單 | | 分隔符 | `DelimiterBasedFrameDecoder` | 簡單 | | 專門的 length 欄位 | `LengthFieldBasedFrameDecoder` | `LengthFieldPrepender` | 注意到,Netty提供了對應的解碼器來解決對應的問題,有了這些解碼器,使用者不需要自己對讀取的報文進行人工解碼,也不需要考慮TCP的粘包和半包問題。為什麼這麼說呢?下面列舉一個包尾增加分隔符的例子: ```java import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.util.concurrent.atomic.AtomicInteger; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:15 * @Version: 1.0 * @Description:入站處理器 */ @ChannelHandler.Sharable public class DelimiterServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger completeCounter = new AtomicInteger(0); /*** 服務端讀取到網路資料後的處理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** 服務端讀取完成網路資料後的處理*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("the ReadComplete count is " +completeCounter.incrementAndGet()); } /*** 發生異常後的處理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } } ``` ```java import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import java.net.InetSocketAddress; /** * @Author: wuxiaofei * @Date: 2020/8/15 0015 19:17 * @Version: 1.0 * @Description:服務端 */ public class DelimiterEchoServer { public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997; public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("伺服器即將啟動"); delimiterEchoServer.start(); } public void start() throws InterruptedException { final DelimiterServerHandler serverHandler = new DelimiterServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*執行緒組*/ try { ServerBootstrap b = new ServerBootstrap();/*服務端啟動必須*/ b.group(group)/*將執行緒組傳入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO進行網路傳輸*/ .localAddress(new InetSocketAddress(PORT))/*指定伺服器監聽埠*/ /*服務端每接收到一個連線請求,就會新啟一個socket通訊,也就是channel, 所以下面這段程式碼的作用就是為這個子channel增加handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*非同步繫結到伺服器,sync()會阻塞直到完成*/ System.out.println("伺服器啟動完成,等待客戶端的連線和資料....."); f.channel().closeFuture().sync();/*阻塞直到伺服器的channel關閉*/ } finally { group.shutdownGracefully().sync();/*優雅關閉執行緒組*/ } } private static class ChannelInitializerImp extends ChannelInitializer { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL .getBytes()); //服務端收到資料包後經過DelimiterBasedFrameDecoder即分隔符基礎框架解碼器解碼為一個個帶有分隔符的資料包。 ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } } } ``` 新增到`ChannelPipeline`的`DelimiterBasedFrameDecoder`用於對使用分隔符結尾的訊息進行自動解碼,當然還有沒有用到的`FixedLengthFrameDecoder`用於對固定長度的訊息進行自動解碼等解碼器。正如上門的程式碼使用案例,有了Netty提供的幾碼器可以輕鬆地完成對很多訊息的自動解碼,而且不需要考慮TCP粘包/拆包導致的讀半包問題,極大地提升了開發效率。 ## Netty示例程式碼詳解 相信看完上面的鋪墊,你對Netty編碼有了一定的瞭解了,下面再來整體梳理一遍吧。 ![dVp7yn.png](https://upload-images.jianshu.io/upload_images/5660078-d23f9bba3522c20b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 1、設定`EventLoopGroup`執行緒組(`Reactor`執行緒組) ```java EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ``` 上面我們說過`Netty`中使用`Reactor`模式,`bossGroup`表示伺服器連線監聽執行緒組,專門接受 `Accept` 新的客戶端`client` 連線。另一個`workerGroup`表示處理每一連線的資料收發的執行緒組,來處理訊息的讀寫事件。 2、服務端引導器 ```java ServerBootstrap serverBootstrap = new ServerBootstrap(); ``` 整合所有配置,用來啟動`Netty`服務端。 3、設定`ServerBootstrap`資訊 ```java serverBootstrap.group(bossGroup, workerGroup); ``` 將兩個執行緒組設定到`ServerBootstrap`中。 4、設定`ServerSocketChannel`型別 ```java serverBootstrap.channel(NioServerSocketChannel.class); ``` 設定通道的`IO`型別,`Netty`不止支援`Java NIO`,也支援阻塞式`IO`,例如`OIO`**OioServerSocketChannel.class)** 5、設定引數 ```java serverBootstrap.option(ChannelOption.SO_BACKLOG, 100); ``` 通過`option()`方法可以設定很多引數,這裡`SO_BACKLOG`標識服務端接受連線的佇列長度,如果佇列已滿,客戶端連線將被拒絕。預設值,`Windows`為200,其他為128,這裡設定的是100。 6、設定`Handler` ```java serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); ``` 設定 `ServerSocketChannel`對應的`Handler`,這裡只能設定一個,它會在`SocketChannel`建立起來之前執行。 7、設定子Handler ```java serverBootstrap.childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new ChatNettyHandler()); } }); ``` `Netty`中提供了一種可以設定多個`Handler`的途徑,即使用`ChannelInitializer`方式。`ChannelPipeline`是`Netty`處理請求的責任鏈,這是一個`ChannelHandler`的連結串列,而`ChannelHandler`就是用來處理網路請求的內容的。 每一個`channel`,都有一個處理器流水線。裝配`child channel`流水線,呼叫`childHandler()`方法,傳遞一個`ChannelInitializer` 的例項。 在 `child channel` 建立成功,開始通道初始化的時候,在bootstrap啟動器中配置的`ChannelInitializer` 例項就會被呼叫。 這個時候,才真正的執行去執行 `initChannel` 初始化方法,開始通道流水線裝配。 流水線裝配,主要是在流水線`pipeline`的後面,增加負責資料讀寫、處理業務邏輯的`handler`。 處理器 `ChannelHandler` 用來處理網路請求內容,有`ChannelInboundHandler`和`ChannelOutboundHandler`兩種,`ChannlPipeline`會從頭到尾順序呼叫`ChannelInboundHandler`處理網路請求內容,從尾到頭呼叫`ChannelOutboundHandler`處理網路請求內容 8、繫結埠號 ```java ChannelFuture f = serverBootstrap.bind(PORT).sync(); ``` 繫結埠號 9、等待服務端埠號關閉 ``` f.channel().closeFuture().sync(); ``` 等待服務端監聽埠關閉,`sync()`會阻塞主執行緒,內部呼叫的是 `Object` 的 `wait()`方法 10、關閉EventLoopGroup執行緒組 ```java bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); ``` ## 總結 這篇文章主要是從一個`demo`作為引子,然後介紹了`Netty`的包結構、`Reactor`模型、程式設計規範等等,目的很簡單,希望你能夠讀懂這段`demo`並寫出來。 後面開始繼續`Netty`原始碼解析部分,敬請期待。 ## 參考資料 1. 《Netty in Action》書籍 1. 慕課Netty專欄 2. 掘金閃電俠Netty小冊 3. 芋道原始碼Netty專欄 5. Github[fork from krcys] 感謝Netty專欄作者們優秀的文章內容~ [1]:https://www.cnblogs.com/wang-meng/p/12089911.html ![原創乾貨分享.png](https://user-gold-cdn.xitu.io/2020/6/22/172d9428e6afd974?w=900&h=383&f=png&s=100094)