1. 程式人生 > 其它 >07 粘包和半包問題的處理策略和自定義協議的要素

07 粘包和半包問題的處理策略和自定義協議的要素

1 資料傳輸的經典問題

1-1 問題概述

粘包和半包問題的本質:實際開發中,程式設計師在應用層訊息的分界無法在傳輸層得到支援,脫離應用層討論粘包和半包毫無意義。

粘包問題

現象:傳送 abc def,接收 abcdef

發生場景 說明
接受方的應用層緩衝區較大 接收方的應用層的 ByteBuf 設定較大,接受方的傳輸層的滑動視窗比較小
接受方的傳輸層的接受視窗較大 傳輸層中傳送方 256 bytes 是一個完整報文,但由於接收方處理不及時且緩衝區足夠大,這 256 bytes 位元組就會緩衝在接收方的滑動視窗中,緩衝了多個報文就會粘包
傳送方的傳輸層緩衝區 Nagle演算法會造成粘包
if 有資料要傳送:
   if 可用視窗大小 >= MSS and 可傳送的資料 >= MSS:
        立刻傳送MSS大小的資料
   else :
       if 有未確認的資料:
            將資料放入快取等待接收ACK (多次小批量的資料傳送會被合併一起傳送)
       else:
            立刻傳送資料

Nagle 演算法

  • 即使傳送一個位元組,也需要加入 tcp 頭和 ip 頭,也就是總位元組數會使用 41 bytes,非常不經濟。因此為了提高網路利用率,tcp 希望儘可能傳送足夠大的資料,這就是 Nagle 演算法產生的緣由
  • 該演算法是指傳送端即使還有應該傳送的資料,但如果這部分資料很少的話,則進行延遲傳送
    • 如果 SO_SNDBUF 的資料達到 MSS,則需要傳送
    • 如果 SO_SNDBUF 中含有 FIN(表示需要連線關閉)這時將剩餘資料傳送,再關閉
    • 如果 TCP_NODELAY = true,則需要傳送
    • 已傳送的資料都收到 ack 時,則需要傳送
    • 上述條件不滿足,但發生超時(一般為 200ms)則需要傳送
    • 除上述情況,延遲傳送

Delay ack 和 Nagle演算法

訊息邊界問題需要考慮的情況以及常用解決策略

半包問題

現象:傳送 abcdef,接收 abc def

原因:

發生場景 說明
應用層緩衝區
ByteBuf的大小無法一次性存放傳輸過來的資料
傳輸層傳送視窗限制 接收方的視窗只剩了 128 bytes,傳送方的報文大小是 256 bytes,這時放不下了,只能先發送前 128 bytes,等待 ack 後才能傳送剩餘部分,這就造成了半包
實際鏈路容量限制 傳送的資料超過 MSS 限制後,會將資料切分發送,就會造成半包

MSS 是最大段長度(maximum segment size),它是 MTU 去除 tcp 頭和 ip 頭後剩餘能夠作為資料傳輸的位元組數最大報文段長度

  • 鏈路層對一次能夠傳送的最大資料有限制,這個限制稱之為 MTU(maximum transmission unit),不同的鏈路裝置的 MTU 值也有所不同,例如

    • 乙太網的 MTU 是 1500,FDDI(光纖分散式資料介面)的 MTU 是 4352,本地迴環地址的 MTU 是 65535 - 本地測試不走網絡卡
  • TCP 在傳遞大量資料時,會按照 MSS 大小將資料進行分割傳送

  • MSS 的值在三次握手時通知對方自己 MSS 的值,然後在兩者之間選擇一個小值作為 MSS

1-2 解決方案

粘包與半包問題的解決需要我們對訊息間的分隔進行,可以從以下幾個思路入手

解決思路 特點 netty提供的入站hanlder 實際案例
採用短連線,發一個包建立一次連線,這樣連線建立到連線斷開之間就是訊息的邊界 通過連線的建立與斷開分隔離訊息,每次連線與建立都會耗費額外時間,效率太低,無法解決半包問題
每一條訊息採用固定長度 提前約定每個訊息的固定長度,資料包的大小不好把握,不夠靈活,只適合特定場景 固定長度解碼器FixedLengthFrameDecoder
約定固定字元作為分隔符,如果超出指定長度仍未出現分隔符,則丟擲異常 某些訊息內容中需要包含分隔符,不具有通用性 基於分隔符的解碼器DelimiterBasedFrameDecoder和LineBasedFrameDecoder redis協議是採用回車+換行作為分隔離符
約定用定長位元組表示接下來資料的長度,每條訊息分為 head 和 body,head 中包含 body 的長度 現代應用層傳輸協議中訊息頭部通常會有訊息體的長度。 基於長度欄位的解碼器LengthFieldBasedFrameDecoder http協議
  • 幾種解決思路中最常用的就是通過長度欄位來進行訊息的分割,現有的應用層協議的訊息頭中就有訊息長度欄位

1-3 程式碼例項

服務端程式碼

  • 通過更改TCP和Netty應用緩衝區的大小,可以看到半包和粘包現象
 // 設定服務端的TCP的接受緩衝區(receive buffer)為10個位元組,不進行設定的則是由雙方自動協商
serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
 // 設定Bytebuf的大小為16位元組,netty預設Bytebuf為1024位元組
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));
package application.typical_problem;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Server {
    static final Logger log = LoggerFactory.getLogger(Client.class);
    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 設定服務端的TCP的接受緩衝區(receive buffer)為10個位元組,不進行設定的則是由雙方自動協商
            // serverBootstrap.option(ChannelOption.SO_RCVBUF,10);

            serverBootstrap.channel(NioServerSocketChannel.class);
            // 設定Bytebuf的大小為16位元組,netty預設Bytebuf為1024位元組
            // serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("connected {}", ctx.channel());
                            super.channelActive(ctx);
                        }

                        @Override
                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("disconnect {}", ctx.channel());
                            super.channelInactive(ctx);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            log.debug("{} binding...", channelFuture.channel());
            channelFuture.sync();
            log.debug("{} bound...", channelFuture.channel());
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            log.debug("stoped");
        }
    }

    public static void main(String[] args) {
        new Server().start();
    }
}
短連線

短連線客戶端例項:

package application.typical_problem;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// 每呼叫一次send()相當於重新建立一次連線併發送資料
// 每次短連線傳送一部分資料,顯然效率低下
public class NianBaoClient {
    static final Logger log = LoggerFactory.getLogger(NianBaoClient.class);
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            send();
        }
    }
    private static void send() {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("conneted...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            ByteBuf buffer = ctx.alloc().buffer();
                            buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                            ctx.writeAndFlush(buffer);
                            // 發完即關
                            ctx.close();
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();       // 關閉執行緒池
        }
    }
}
固定長度的編碼器
  • netty提供了固定長度解碼器作為handler對訊息進行切分

固定長度解碼器的作用:

  • 本質上是官方實現的一個handler
 * +---+----+------+----+
 * | A | BC | DEFG | HI |
 * +---+----+------+----+
  * +-----+-----+-----+
 * | ABC | DEF | GHI |
 * +-----+-----+-----+
 
該解碼器將收到的位元組流分割成固定長度的位元組片段,如果我們收到了上述的four fragmented packets,那麼經過這個特徵的handler就可以得到三個長度相同的片段

通過在服務端新增固定長度分割的入站handler得到固定長度的訊息

服務端程式碼

package application.typical_problem;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class FixedServer {
    static final Logger log = LoggerFactory.getLogger(Client.class);
    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 新增handler,該handler實現對訊息的固定長度分割,這裡是8個位元組為固定長度
                    ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("connected {}", ctx.channel());
                            super.channelActive(ctx);
                        }

                        @Override
                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("disconnect {}", ctx.channel());
                            super.channelInactive(ctx);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            log.debug("{} binding...", channelFuture.channel());
            channelFuture.sync();
            log.debug("{} bound...", channelFuture.channel());
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            log.debug("stoped");
        }
    }

    public static void main(String[] args) {
        new FixedServer().start();
    }
}

客戶端程式碼

package application.typical_problem;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;

public class FixedClient {
    static final Logger log = LoggerFactory.getLogger(FixedClient.class);

    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            // 傳送內容隨機的資料包
                            Random r = new Random();
                            char c = 'a';
                            ByteBuf buffer = ctx.alloc().buffer();
                            for (int i = 0; i < 10; i++) {
                                byte[] bytes = new byte[8];
                                for (int j = 0; j < r.nextInt(8); j++) {
                                    bytes[j] = (byte) c;
                                }
                                c++;
                                buffer.writeBytes(bytes);
                            }
                            ctx.writeAndFlush(buffer);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

客戶端輸出

16:09:20 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x054526db] REGISTERED
16:09:20 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x054526db] CONNECT: localhost/127.0.0.1:8080
16:09:20 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x054526db, L:/127.0.0.1:14833 - R:localhost/127.0.0.1:8080] ACTIVE
16:09:20 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x054526db, L:/127.0.0.1:14833 - R:localhost/127.0.0.1:8080] WRITE: 80B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 00 00 00 00 00 00 62 62 62 00 00 00 00 00 |aa......bbb.....|
|00000010| 00 00 00 00 00 00 00 00 64 64 64 64 00 00 00 00 |........dddd....|
|00000020| 00 00 00 00 00 00 00 00 66 66 66 00 00 00 00 00 |........fff.....|
|00000030| 67 00 00 00 00 00 00 00 68 00 00 00 00 00 00 00 |g.......h.......|
|00000040| 69 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |i...............|
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x054526db, L:/127.0.0.1:14833 - R:localhost/127.0.0.1:8080] FLUSH

服務端輸出

16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 00 00 00 00 00 00                         |aa......        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 00 00 00 00 00                         |bbb.....        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 00 00 00                         |........        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 64 64 64 00 00 00 00                         |dddd....        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 00 00 00                         |........        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 66 66 00 00 00 00 00                         |fff.....        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 67 00 00 00 00 00 00 00                         |g.......        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 00 00 00 00 00 00 00                         |h.......        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 00 00 00 00 00 00 00                         |i.......        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ: 8B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 00 00 00                         |........        |
+--------+-------------------------------------------------+----------------+
16:09:20 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x9af7db67, L:/127.0.0.1:8080 - R:/127.0.0.1:14833] READ COMPLETE

從輸出可以看出,服務端確實是以8個字元為單位對訊息體進行解碼

採用分割符的編碼器
  • netty提供了2個行解碼器

LineBasedFrameDecoder:支援換行符進行分割,在windows(\r\n)和Linux平臺(\n)都能夠得到支援

public LineBasedFrameDecoder(final int maxLength) {
        this(maxLength, true, false);
}

DelimiterBasedFrameDecoder:支援自定義的分割符

public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
	this(maxFrameLength, true, delimiter);
}
  • 採用分隔符作用訊息的分界時,需要對訊息的最長長度進行定義
基於長度欄位的編碼器

LengthFieldBasedFrameDecoder:根據長度欄位對bytebuf進行劃分

The value of the length field in this example is 12 (0x0C) which represents the length of "HELLO, WORLD". By default, the decoder assumes that the length field represents the number of the bytes that follows the length field. Therefore, it can be decoded with the simplistic parameter combination.

 lengthFieldOffset   = 0       // 0表示從頭開始就是表示長度的欄位
 lengthFieldLength   = 2       // 2表示長度欄位佔用兩個位元組
 lengthAdjustment    = 0
 initialBytesToStrip = 0 (= do not strip header)
  BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 +--------+----------------+      +--------+----------------+
 | Length | Actual Content |----->| Length | Actual Content |
 | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
 +--------+----------------+      +--------+----------------+
 ---------------------------------------------------------------------
 lengthFieldOffset   = 0
 lengthFieldLength   = 2
 lengthAdjustment    = 0
 initialBytesToStrip = 2 (= the length of the Length field)
 // initialBytesToStrip可以用於提取訊息體資訊,去除代表長度欄位的兩個位元組

 BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
 +--------+----------------+      +----------------+
 | Length | Actual Content |----->| Actual Content |
 | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
 +--------+----------------+      +----------------+
 ---------------------------------------------------------------------------
  lengthFieldOffset   =  0
 lengthFieldLength   =  2
 lengthAdjustment    = -2 (= the length of the Length field)
 initialBytesToStrip =  0

 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
 +--------+----------------+      +--------+----------------+
 | Length | Actual Content |----->| Length | Actual Content |
 | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
 +--------+----------------+      +--------+----------------+
 上述例子中長度欄位是整個訊息的位元組數目,
 實際內容的位元組數目 = 整個訊息的位元組數目 - 長度欄位的位元組數目 = Length -  lengthAdjustment
----------------------------------------------------------------------------
 lengthFieldOffset   = 2 (= the length of Header 1)
 lengthFieldLength   = 3
 lengthAdjustment    = 0
 initialBytesToStrip = 0

 BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
 +----------+----------+----------------+      +----------+----------+----------------+
 | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
 |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
 +----------+----------+----------------+      +----------+----------+----------------+
基於長度編碼器的關鍵引數 含義
lengthFieldOffset 訊息長度所在欄位偏移量
lengthFieldLength 長度欄位的長度(位元組數)
lengthAdjustment 長度欄位的位置作為基準,還有多少位元組是內容,負數則是表明長度去除多少才是內容
initialBytesToStrip 從頭剝離幾個位元組

基於長度的編碼器使用例項:

package application.typical_problem;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class FieldBasedEncoder {
    public static void main(String[] args) {
        EmbeddedChannel ch = new EmbeddedChannel();
        // 訊息長度,長度偏移量,長度欄位的位元組數,調整位元組數,去除位元組數
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,4));
        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        ByteBuf bf = ByteBufAllocator.DEFAULT.buffer();
        send(bf,"hello,god");
        ch.writeInbound(bf);
    }

    public static void send(ByteBuf bf,String s){
        int n = s.length();
        byte[] arr = s.getBytes();
        bf.writeInt(n);         // 預設以大端模式寫入int,寫入長度
        // 注:如果希望長度欄位後面再放入其他的欄位比如版本號等可以通過lengthFieldOffset控制其他欄位的長度
        bf.writeBytes(arr);
    }
}

實驗結果

17:11:59 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 9B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 2c 67 6f 64                      |hello,god       |
+--------+-------------------------------------------------+----------------+
17:11:59 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE

2 協議的設計與解析

2-1 Redis的通訊協議

Redis協議英文說明

In RESP, the type of some data depends on the first byte:

type of data first type
Simple Strings the first byte of the reply is "+"
Errors the reply is "-"
Integers the first byte of the reply is ":"
Bulk Strings the first byte of the reply is "$"
Arrays the first byte of the reply is "*"

In RESP different parts of the protocol are always terminated with "\r\n" (CRLF).(回車和換行進行分隔)

介面卡模式

Netty模擬Redis客戶端傳送命令: set name Amazing

package application.protocol_design;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;

// 使用Netty模擬redis客戶端傳送set name Amazing命令
public class TestRedis {
    /*
       set name zhangsan
       $3set
       $4name
       $8Amazing
     */
    static final byte[] LINE = {13,10};   // 回車和換行字元的ascii碼
    public static void main(String[] args) {
        NioEventLoopGroup g = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new io.netty.bootstrap.Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(g);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf)msg;
                    System.out.println(buf.toString(Charset.defaultCharset()));
                }
                // 這裡實現了向redis傳送陣列型別的資料為""*3\r\n$3\r\set\r\n$4\r\name\r\n$8\r\Amazing\r\n""
                // 該命令在redis服務端解析為 set name Amazing
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ByteBuf bf = ByteBufAllocator.DEFAULT.buffer();
                            bf.writeBytes("*3".getBytes());
                            bf.writeBytes(LINE);
                            bf.writeBytes("$3".getBytes());
                            bf.writeBytes(LINE);
                            bf.writeBytes("set".getBytes());
                            bf.writeBytes(LINE);
                            bf.writeBytes("$4".getBytes());
                            bf.writeBytes(LINE);
                            bf.writeBytes("name".getBytes());
                            bf.writeBytes(LINE);
                            bf.writeBytes("$8".getBytes());
                            bf.writeBytes(LINE);
                            bf.writeBytes("Amazing".getBytes());
                            bf.writeBytes(LINE);
                            ctx.writeAndFlush(bf);
                        }
                    });

                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost",6379).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2-2 Netty的Http協議編碼解碼器

public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
        implements HttpServerUpgradeHandler.SourceCodec
  • HttpServerCodec是一個Http編碼解碼器,實現了入站和出站的handler的介面

模擬一個能夠獲取http報文資訊的伺服器:

package application.protocol_design;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http2.InboundHttpToHttp2Adapter;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class HttpServer {
    static final Logger log = LoggerFactory.getLogger(HttpServer.class);
    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new HttpServerCodec());  // http協議的編碼/解碼器
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                           log.debug("{}",msg.getClass());
                        }
                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            channelFuture.sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            log.debug("stoped");
        }
    }

    public static void main(String[] args) {
        new HttpServer().start();

    }
}

瀏覽器訪問http://localhost:8080/index

輸出結果如下

  • 可以看到netty的HttpServerCodec()能夠解析出採用http協議的文字資訊
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 69 6e 64 65 78 20 48 54 54 50 2f |GET /index HTTP/|
|00000010| 31 2e 31 0d 0a 48 6f 73 74 3a 20 6c 6f 63 61 6c |1.1..Host: local|
|00000020| 68 6f 73 74 3a 38 30 38 30 0d 0a 43 6f 6e 6e 65 |host:8080..Conne|
|00000030| 63 74 69 6f 6e 3a 20 6b 65 65 70 2d 61 6c 69 76 |ction: keep-aliv|
|00000040| 65 0d 0a 43 61 63 68 65 2d 43 6f 6e 74 72 6f 6c |e..Cache-Control|
|00000050| 3a 20 6d 61 78 2d 61 67 65 3d 30 0d 0a 73 65 63 |: max-age=0..sec|
|00000060| 2d 63 68 2d 75 61 3a 20 22 47 6f 6f 67 6c 65 20 |-ch-ua: "Google |
|00000070| 43 68 72 6f 6d 65 22 3b 76 3d 22 39 35 22 2c 20 |Chrome";v="95", |
|00000080| 22 43 68 72 6f 6d 69 75 6d 22 3b 76 3d 22 39 35 |"Chromium";v="95|
|00000090| 22 2c 20 22 3b 4e 6f 74 20 41 20 42 72 61 6e 64 |", ";Not A Brand|
|000000a0| 22 3b 76 3d 22 39 39 22 0d 0a 73 65 63 2d 63 68 |";v="99"..sec-ch|
|000000b0| 2d 75 61 2d 6d 6f 62 69 6c 65 3a 20 3f 30 0d 0a |-ua-mobile: ?0..|
|000000c0| 73 65 63 2d 63 68 2d 75 61 2d 70 6c 61 74 66 6f |sec-ch-ua-platfo|
|000000d0| 72 6d 3a 20 22 57 69 6e 64 6f 77 73 22 0d 0a 55 |rm: "Windows"..U|
|000000e0| 70 67 72 61 64 65 2d 49 6e 73 65 63 75 72 65 2d |pgrade-Insecure-|
|000000f0| 52 65 71 75 65 73 74 73 3a 20 31 0d 0a 55 73 65 |Requests: 1..Use|
|00000100| 72 2d 41 67 65 6e 74 3a 20 4d 6f 7a 69 6c 6c 61 |r-Agent: Mozilla|
|00000110| 2f 35 2e 30 20 28 57 69 6e 64 6f 77 73 20 4e 54 |/5.0 (Windows NT|
|00000120| 20 31 30 2e 30 3b 20 57 69 6e 36 34 3b 20 78 36 | 10.0; Win64; x6|
|00000130| 34 29 20 41 70 70 6c 65 57 65 62 4b 69 74 2f 35 |4) AppleWebKit/5|
|00000140| 33 37 2e 33 36 20 28 4b 48 54 4d 4c 2c 20 6c 69 |37.36 (KHTML, li|
|00000150| 6b 65 20 47 65 63 6b 6f 29 20 43 68 72 6f 6d 65 |ke Gecko) Chrome|
|00000160| 2f 39 35 2e 30 2e 34 36 33 38 2e 36 39 20 53 61 |/95.0.4638.69 Sa|
|00000170| 66 61 72 69 2f 35 33 37 2e 33 36 0d 0a 41 63 63 |fari/537.36..Acc|
|00000180| 65 70 74 3a 20 74 65 78 74 2f 68 74 6d 6c 2c 61 |ept: text/html,a|
|00000190| 70 70 6c 69 63 61 74 69 6f 6e 2f 78 68 74 6d 6c |pplication/xhtml|
|000001a0| 2b 78 6d 6c 2c 61 70 70 6c 69 63 61 74 69 6f 6e |+xml,application|
|000001b0| 2f 78 6d 6c 3b 71 3d 30 2e 39 2c 69 6d 61 67 65 |/xml;q=0.9,image|
|000001c0| 2f 61 76 69 66 2c 69 6d 61 67 65 2f 77 65 62 70 |/avif,image/webp|
|000001d0| 2c 69 6d 61 67 65 2f 61 70 6e 67 2c 2a 2f 2a 3b |,image/apng,*/*;|
|000001e0| 71 3d 30 2e 38 2c 61 70 70 6c 69 63 61 74 69 6f |q=0.8,applicatio|
|000001f0| 6e 2f 73 69 67 6e 65 64 2d 65 78 63 68 61 6e 67 |n/signed-exchang|
|00000200| 65 3b 76 3d 62 33 3b 71 3d 30 2e 39 0d 0a 53 65 |e;v=b3;q=0.9..Se|
|00000210| 63 2d 46 65 74 63 68 2d 53 69 74 65 3a 20 6e 6f |c-Fetch-Site: no|
|00000220| 6e 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f |ne..Sec-Fetch-Mo|
|00000230| 64 65 3a 20 6e 61 76 69 67 61 74 65 0d 0a 53 65 |de: navigate..Se|
|00000240| 63 2d 46 65 74 63 68 2d 55 73 65 72 3a 20 3f 31 |c-Fetch-User: ?1|
|00000250| 0d 0a 53 65 63 2d 46 65 74 63 68 2d 44 65 73 74 |..Sec-Fetch-Dest|
|00000260| 3a 20 64 6f 63 75 6d 65 6e 74 0d 0a 41 63 63 65 |: document..Acce|
|00000270| 70 74 2d 45 6e 63 6f 64 69 6e 67 3a 20 67 7a 69 |pt-Encoding: gzi|
|00000280| 70 2c 20 64 65 66 6c 61 74 65 2c 20 62 72 0d 0a |p, deflate, br..|
|00000290| 41 63 63 65 70 74 2d 4c 61 6e 67 75 61 67 65 3a |Accept-Language:|
|000002a0| 20 7a 68 2d 43 4e 2c 7a 68 3b 71 3d 30 2e 39 2c | zh-CN,zh;q=0.9,|
|000002b0| 65 6e 3b 71 3d 30 2e 38 2c 7a 68 2d 54 57 3b 71 |en;q=0.8,zh-TW;q|
|000002c0| 3d 30 2e 37 0d 0a 0d 0a                         |=0.7....        |
+--------+-------------------------------------------------+----------------+
23:39:10 [DEBUG] [nioEventLoopGroup-2-1] a.p.HttpServer - class io.netty.handler.codec.http.DefaultHttpRequest
Head
23:39:10 [DEBUG] [nioEventLoopGroup-2-1] a.p.HttpServer - class io.netty.handler.codec.http.LastHttpContent
body
23:39:10 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xb501161e, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:12789] READ COMPLETE

從日誌可以看出,channelRead方法被呼叫了兩次,兩次訊息的型別不同,分別是:

  • io.netty.handler.codec.http.DefaultHttpRequest
  • io.netty.handler.codec.http.LastHttpContent

這兩個型別分別代表http的請求頭和請求體


模擬一個能夠獲取http報文資訊並反饋hello world的伺服器:

  • 注意該程式碼中通過
package application.protocol_design;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;


public class HttpServer {
    static final Logger log = LoggerFactory.getLogger(HttpServer.class);


    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.group(boss);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new HttpServerCodec());  // http協議的編碼/解碼器
                    /*
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            log.debug("{}",msg.getClass());
                            if(msg instanceof HttpRequest) System.out.println("Head");
                            else System.out.println("body");
                        }
                    });*/
                    // SimpleChannelInboundHandler:能夠處理特定型別的訊息,訊息型別與泛型一種
                    ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                            log.debug(msg.uri());
                            DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(),
                                    HttpResponseStatus.OK);
                            byte[] bytes = "<h1>hello world</h1>".getBytes();
                            response.headers().setInt(CONTENT_LENGTH, bytes.length);
                            response.content().writeBytes(bytes);
                            ctx.writeAndFlush(response);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            channelFuture.sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            log.debug("stoped");
        }
    }

    public static void main(String[] args) {
        new HttpServer().start();
    }
}

瀏覽器訪問http://localhost:8080/index後的服務端日誌輸出

15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x07b5e461, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13010] REGISTERED
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x07b5e461, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13010] ACTIVE
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] REGISTERED
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] ACTIVE
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] READ: 686B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 69 6e 64 65 78 20 48 54 54 50 2f |GET /index HTTP/|
|00000010| 31 2e 31 0d 0a 48 6f 73 74 3a 20 6c 6f 63 61 6c |1.1..Host: local|
|00000020| 68 6f 73 74 3a 38 30 38 30 0d 0a 43 6f 6e 6e 65 |host:8080..Conne|
|00000030| 63 74 69 6f 6e 3a 20 6b 65 65 70 2d 61 6c 69 76 |ction: keep-aliv|
|00000040| 65 0d 0a 73 65 63 2d 63 68 2d 75 61 3a 20 22 47 |e..sec-ch-ua: "G|
|00000050| 6f 6f 67 6c 65 20 43 68 72 6f 6d 65 22 3b 76 3d |oogle Chrome";v=|
|00000060| 22 39 35 22 2c 20 22 43 68 72 6f 6d 69 75 6d 22 |"95", "Chromium"|
|00000070| 3b 76 3d 22 39 35 22 2c 20 22 3b 4e 6f 74 20 41 |;v="95", ";Not A|
|00000080| 20 42 72 61 6e 64 22 3b 76 3d 22 39 39 22 0d 0a | Brand";v="99"..|
|00000090| 73 65 63 2d 63 68 2d 75 61 2d 6d 6f 62 69 6c 65 |sec-ch-ua-mobile|
|000000a0| 3a 20 3f 30 0d 0a 73 65 63 2d 63 68 2d 75 61 2d |: ?0..sec-ch-ua-|
|000000b0| 70 6c 61 74 66 6f 72 6d 3a 20 22 57 69 6e 64 6f |platform: "Windo|
|000000c0| 77 73 22 0d 0a 55 70 67 72 61 64 65 2d 49 6e 73 |ws"..Upgrade-Ins|
|000000d0| 65 63 75 72 65 2d 52 65 71 75 65 73 74 73 3a 20 |ecure-Requests: |
|000000e0| 31 0d 0a 55 73 65 72 2d 41 67 65 6e 74 3a 20 4d |1..User-Agent: M|
|000000f0| 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 57 69 6e 64 |ozilla/5.0 (Wind|
|00000100| 6f 77 73 20 4e 54 20 31 30 2e 30 3b 20 57 69 6e |ows NT 10.0; Win|
|00000110| 36 34 3b 20 78 36 34 29 20 41 70 70 6c 65 57 65 |64; x64) AppleWe|
|00000120| 62 4b 69 74 2f 35 33 37 2e 33 36 20 28 4b 48 54 |bKit/537.36 (KHT|
|00000130| 4d 4c 2c 20 6c 69 6b 65 20 47 65 63 6b 6f 29 20 |ML, like Gecko) |
|00000140| 43 68 72 6f 6d 65 2f 39 35 2e 30 2e 34 36 33 38 |Chrome/95.0.4638|
|00000150| 2e 36 39 20 53 61 66 61 72 69 2f 35 33 37 2e 33 |.69 Safari/537.3|
|00000160| 36 0d 0a 41 63 63 65 70 74 3a 20 74 65 78 74 2f |6..Accept: text/|
|00000170| 68 74 6d 6c 2c 61 70 70 6c 69 63 61 74 69 6f 6e |html,application|
|00000180| 2f 78 68 74 6d 6c 2b 78 6d 6c 2c 61 70 70 6c 69 |/xhtml+xml,appli|
|00000190| 63 61 74 69 6f 6e 2f 78 6d 6c 3b 71 3d 30 2e 39 |cation/xml;q=0.9|
|000001a0| 2c 69 6d 61 67 65 2f 61 76 69 66 2c 69 6d 61 67 |,image/avif,imag|
|000001b0| 65 2f 77 65 62 70 2c 69 6d 61 67 65 2f 61 70 6e |e/webp,image/apn|
|000001c0| 67 2c 2a 2f 2a 3b 71 3d 30 2e 38 2c 61 70 70 6c |g,*/*;q=0.8,appl|
|000001d0| 69 63 61 74 69 6f 6e 2f 73 69 67 6e 65 64 2d 65 |ication/signed-e|
|000001e0| 78 63 68 61 6e 67 65 3b 76 3d 62 33 3b 71 3d 30 |xchange;v=b3;q=0|
|000001f0| 2e 39 0d 0a 53 65 63 2d 46 65 74 63 68 2d 53 69 |.9..Sec-Fetch-Si|
|00000200| 74 65 3a 20 6e 6f 6e 65 0d 0a 53 65 63 2d 46 65 |te: none..Sec-Fe|
|00000210| 74 63 68 2d 4d 6f 64 65 3a 20 6e 61 76 69 67 61 |tch-Mode: naviga|
|00000220| 74 65 0d 0a 53 65 63 2d 46 65 74 63 68 2d 55 73 |te..Sec-Fetch-Us|
|00000230| 65 72 3a 20 3f 31 0d 0a 53 65 63 2d 46 65 74 63 |er: ?1..Sec-Fetc|
|00000240| 68 2d 44 65 73 74 3a 20 64 6f 63 75 6d 65 6e 74 |h-Dest: document|
|00000250| 0d 0a 41 63 63 65 70 74 2d 45 6e 63 6f 64 69 6e |..Accept-Encodin|
|00000260| 67 3a 20 67 7a 69 70 2c 20 64 65 66 6c 61 74 65 |g: gzip, deflate|
|00000270| 2c 20 62 72 0d 0a 41 63 63 65 70 74 2d 4c 61 6e |, br..Accept-Lan|
|00000280| 67 75 61 67 65 3a 20 7a 68 2d 43 4e 2c 7a 68 3b |guage: zh-CN,zh;|
|00000290| 71 3d 30 2e 39 2c 65 6e 3b 71 3d 30 2e 38 2c 7a |q=0.9,en;q=0.8,z|
|000002a0| 68 2d 54 57 3b 71 3d 30 2e 37 0d 0a 0d 0a       |h-TW;q=0.7....  |
+--------+-------------------------------------------------+----------------+
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] a.p.HttpServer - /index
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] WRITE: 59B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 32 30 0d 0a 0d 0a 3c 68 31 3e 68 65 6c 6c 6f | 20....<h1>hello|
|00000030| 20 77 6f 72 6c 64 3c 2f 68 31 3e                | world</h1>     |
+--------+-------------------------------------------------+----------------+
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] FLUSH
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] READ COMPLETE
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] READ: 612B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 73 65 63 2d 63 68 2d |p-alive..sec-ch-|
|00000050| 75 61 3a 20 22 47 6f 6f 67 6c 65 20 43 68 72 6f |ua: "Google Chro|
|00000060| 6d 65 22 3b 76 3d 22 39 35 22 2c 20 22 43 68 72 |me";v="95", "Chr|
|00000070| 6f 6d 69 75 6d 22 3b 76 3d 22 39 35 22 2c 20 22 |omium";v="95", "|
|00000080| 3b 4e 6f 74 20 41 20 42 72 61 6e 64 22 3b 76 3d |;Not A Brand";v=|
|00000090| 22 39 39 22 0d 0a 73 65 63 2d 63 68 2d 75 61 2d |"99"..sec-ch-ua-|
|000000a0| 6d 6f 62 69 6c 65 3a 20 3f 30 0d 0a 55 73 65 72 |mobile: ?0..User|
|000000b0| 2d 41 67 65 6e 74 3a 20 4d 6f 7a 69 6c 6c 61 2f |-Agent: Mozilla/|
|000000c0| 35 2e 30 20 28 57 69 6e 64 6f 77 73 20 4e 54 20 |5.0 (Windows NT |
|000000d0| 31 30 2e 30 3b 20 57 69 6e 36 34 3b 20 78 36 34 |10.0; Win64; x64|
|000000e0| 29 20 41 70 70 6c 65 57 65 62 4b 69 74 2f 35 33 |) AppleWebKit/53|
|000000f0| 37 2e 33 36 20 28 4b 48 54 4d 4c 2c 20 6c 69 6b |7.36 (KHTML, lik|
|00000100| 65 20 47 65 63 6b 6f 29 20 43 68 72 6f 6d 65 2f |e Gecko) Chrome/|
|00000110| 39 35 2e 30 2e 34 36 33 38 2e 36 39 20 53 61 66 |95.0.4638.69 Saf|
|00000120| 61 72 69 2f 35 33 37 2e 33 36 0d 0a 73 65 63 2d |ari/537.36..sec-|
|00000130| 63 68 2d 75 61 2d 70 6c 61 74 66 6f 72 6d 3a 20 |ch-ua-platform: |
|00000140| 22 57 69 6e 64 6f 77 73 22 0d 0a 41 63 63 65 70 |"Windows"..Accep|
|00000150| 74 3a 20 69 6d 61 67 65 2f 61 76 69 66 2c 69 6d |t: image/avif,im|
|00000160| 61 67 65 2f 77 65 62 70 2c 69 6d 61 67 65 2f 61 |age/webp,image/a|
|00000170| 70 6e 67 2c 69 6d 61 67 65 2f 73 76 67 2b 78 6d |png,image/svg+xm|
|00000180| 6c 2c 69 6d 61 67 65 2f 2a 2c 2a 2f 2a 3b 71 3d |l,image/*,*/*;q=|
|00000190| 30 2e 38 0d 0a 53 65 63 2d 46 65 74 63 68 2d 53 |0.8..Sec-Fetch-S|
|000001a0| 69 74 65 3a 20 73 61 6d 65 2d 6f 72 69 67 69 6e |ite: same-origin|
|000001b0| 0d 0a 53 65 63 2d 46 65 74 63 68 2d 4d 6f 64 65 |..Sec-Fetch-Mode|
|000001c0| 3a 20 6e 6f 2d 63 6f 72 73 0d 0a 53 65 63 2d 46 |: no-cors..Sec-F|
|000001d0| 65 74 63 68 2d 44 65 73 74 3a 20 69 6d 61 67 65 |etch-Dest: image|
|000001e0| 0d 0a 52 65 66 65 72 65 72 3a 20 68 74 74 70 3a |..Referer: http:|
|000001f0| 2f 2f 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 |//localhost:8080|
|00000200| 2f 69 6e 64 65 78 0d 0a 41 63 63 65 70 74 2d 45 |/index..Accept-E|
|00000210| 6e 63 6f 64 69 6e 67 3a 20 67 7a 69 70 2c 20 64 |ncoding: gzip, d|
|00000220| 65 66 6c 61 74 65 2c 20 62 72 0d 0a 41 63 63 65 |eflate, br..Acce|
|00000230| 70 74 2d 4c 61 6e 67 75 61 67 65 3a 20 7a 68 2d |pt-Language: zh-|
|00000240| 43 4e 2c 7a 68 3b 71 3d 30 2e 39 2c 65 6e 3b 71 |CN,zh;q=0.9,en;q|
|00000250| 3d 30 2e 38 2c 7a 68 2d 54 57 3b 71 3d 30 2e 37 |=0.8,zh-TW;q=0.7|
|00000260| 0d 0a 0d 0a                                     |....            |
+--------+-------------------------------------------------+----------------+
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] a.p.HttpServer - /favicon.ico
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] WRITE: 59B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 32 30 0d 0a 0d 0a 3c 68 31 3e 68 65 6c 6c 6f | 20....<h1>hello|
|00000030| 20 77 6f 72 6c 64 3c 2f 68 31 3e                | world</h1>     |
+--------+-------------------------------------------------+----------------+
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] FLUSH
15:16:17 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x5557ee13, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:13011] READ COMPLETE

從日誌資訊可以看出瀏覽器和服務端通過http協議的互動流程如下:

主體 行為
瀏覽器 GET /index
客戶端 HTTP/1.1 200 OK .content-length: 20....

hello world

瀏覽器 GET /favicon.ico
客戶端 HTTP/1.1 200 OK .content-length: 20....

hello world

  • 第二次瀏覽器自己主動請求GET網站的圖示

2-3 自定義協議

常規的自定義協議包含元素
  • 魔數,用來在第一時間判定是否是無效資料包
JVM位元組碼檔案中首部四個位元組也是魔數用於標識當前檔案型別
  • 版本號,可以支援協議的升級
  • 序列化演算法,訊息正文到底採用哪種序列化反序列化方式,可以由此擴充套件,例如:json、protobuf、hessian、jdk
protobuf、hessian是二進位制的序列化演算法
  • 指令型別,是登入、註冊、單聊、群聊... 跟業務相關
  • 請求序號,為了雙工通訊,提供非同步能力
  • 正文長度
  • 訊息正文

自定義訊息類

根據上述要求可以定義為自己協議定義訊息類如下

  • 訊息類包含:序列ID,訊息型別,儲存<key,value>為<訊息標識,訊息的class物件>的HashMap

靜態程式碼塊在位元組碼層面的體現

該訊息類在建立的時候,通過靜態程式碼塊在class物件載入後初始化HashMap
package application.protocol_design.message;

import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public abstract class Message implements Serializable {

    /**
     * 根據訊息型別位元組,獲得對應的訊息 class
     * @param messageType 訊息型別位元組
     * @return 訊息 class
     */
    public static Class<? extends Message> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }

    private int sequenceId;
    private int messageType;

    public abstract int getMessageType();

    public static final int LoginRequestMessage = 0;
    public static final int LoginResponseMessage = 1;
    public static final int ChatRequestMessage = 2;
    public static final int ChatResponseMessage = 3;
    public static final int GroupCreateRequestMessage = 4;
    public static final int GroupCreateResponseMessage = 5;
    public static final int GroupJoinRequestMessage = 6;
    public static final int GroupJoinResponseMessage = 7;
    public static final int GroupQuitRequestMessage = 8;
    public static final int GroupQuitResponseMessage = 9;
    public static final int GroupChatRequestMessage = 10;
    public static final int GroupChatResponseMessage = 11;
    public static final int GroupMembersRequestMessage = 12;
    public static final int GroupMembersResponseMessage = 13;
    public static final int PingMessage = 14;
    public static final int PongMessage = 15;
    /**
     * 請求型別 byte 值
     */
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    /**
     * 響應型別 byte 值
     */
    public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;

    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
    static {
        messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
        messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
        messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
        messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
        messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
        messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
        messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
        messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
        messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
        messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
        messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
        messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
        messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
        messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }
}

自定義協議的訊息編碼和解碼器(handler)
  • 在netty中通過繼承ByteToMessageCodec類並重寫相關方法實現自定義的編碼/解碼handler
package application.protocol_design.protocol;

import application.protocol_design.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
    static byte[] magicNum = {'l','u','c','k'};

    // 編碼目標:將訊息msg變化為遵循協議的位元組陣列放入到ByteBuf out
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        Byte version = 1;            // 1位元組的協議版本
        Byte serialWay = 0;          // 1位元組的序列化方式: 0表示JDK,1表示json

        // 總位元組數目 = 16(如果不是2的冪可以填充)
        out.writeBytes(magicNum);             // 4位元組的協議魔數
        out.writeByte(version);                // 1位元組的協議版本
        out.writeByte(serialWay);              // 1位元組的序列化方式: 0表示JDK,1表示json
        out.writeByte(msg.getMessageType());   //  1位元組指令型別
        out.writeInt(msg.getSequenceId());     // 4位元組序列號

        //objectOutputStream:把物件轉成位元組資料的輸出到檔案中儲存,物件的輸出過程稱為序列化,可實現物件的持久儲存
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] content = bos.toByteArray();

        out.writeInt(content.length);              // 寫入物件序列化的後的位元組陣列長度
        out.writeByte(0xff);                       //  填充字元,湊滿2的冪為16
        out.writeBytes(content);                   // 寫入物件序列化陣列
    }

    // 解碼目標:將位元組陣列轉化為物件放入到List<Object> out
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serialType = in.readByte();

        byte messageType = in.readByte();
        int sequenceId = in.readInt();

        int length = in.readInt();
        byte padding = in.readByte();
        byte[] arr = new byte[length];
        in.readBytes(arr,0,length);

        // 這裡可以加入根據序列化協議呼叫不同序列化工具的類
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(arr));
        Message message = (Message)ois.readObject();
        out.add(message);
        log.debug("魔數:{} 版本:{} 序列化方式:{} 訊息型別:{} 序列ID:長度:{} {}",magicNum,version,serialType,messageType,sequenceId,length);
        log.debug("{}",message);
    }
}

上述程式碼中的自定義協議的訊息組成如下

名稱 魔數 協議版本 序列化協議 訊息型別 序列ID 內容長度 填充字元 內容
所佔位元組數 4 1 1 1 4 4 1
  • 填充字元讓訊息頭大小變為為16位元組,恰好是2的冪,比較規整。

  • 上述程式碼中encode方法是將類按照自定義協議轉化為byte陣列,decode方法是將byte陣列中提取類的例項物件

自定義協議測試
package application.protocol_design.protocol;

import application.protocol_design.message.LoginRequestMessage;
import application.typical_problem.FieldBasedEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;

// 測試編碼/解碼的handler
public class TestMessageCodec {
    public static void main(String[] args) throws Exception {
        EmbeddedChannel ch = new EmbeddedChannel();
        // 列印日誌的handler
        ch.pipeline().addLast(new LoggingHandler());
        // 基於長度的handler解決傳輸過程中的粘包和半包問題
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,11,4,1,0));
        // 自定義的協議的編碼/解碼器handler
        ch.pipeline().addLast(new MessageCodec());

        // 測試出站編碼

        System.out.println("\n測試自定義協議的編碼\n");


        LoginRequestMessage m = new LoginRequestMessage("god","dog");
        ch.writeOutbound(m);

        System.out.println("\n測試自定義協議的解碼\n");
        // 測試入站解碼
        ByteBuf bf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null,m,bf);
        // 這裡將一條訊息分兩次傳送,如果沒有基於長度欄位的編碼器LengthFieldBasedFrameDecoder,那麼會報錯
        ByteBuf b1 = bf.slice(0,100);
        ByteBuf b2 = bf.slice(100,bf.readableBytes()-100); b1.retain();
        ch.writeInbound(b1); ch.writeInbound(b2);
    }
}

測試日誌

測試自定義協議的編碼

20:52:12 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 245B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 75 63 6b 01 00 00 00 00 00 00 00 00 00 e5 ff |luck............|
|00000010| ac ed 00 05 73 72 00 37 61 70 70 6c 69 63 61 74 |....sr.7applicat|
|00000020| 69 6f 6e 2e 70 72 6f 74 6f 63 6f 6c 5f 64 65 73 |ion.protocol_des|
|00000030| 69 67 6e 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 69 |ign.message.Logi|
|00000040| 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 ca |nRequestMessage.|
|00000050| ab b7 04 6f 14 ef a6 02 00 02 4c 00 08 70 61 73 |...o......L..pas|
|00000060| 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c 61 |swordt..Ljava/la|
|00000070| 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 65 |ng/String;L..use|
|00000080| 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 2b 61 70 |rnameq.~..xr.+ap|
|00000090| 70 6c 69 63 61 74 69 6f 6e 2e 70 72 6f 74 6f 63 |plication.protoc|
|000000a0| 6f 6c 5f 64 65 73 69 67 6e 2e 6d 65 73 73 61 67 |ol_design.messag|
|000000b0| 65 2e 4d 65 73 73 61 67 65 c9 91 c8 73 58 b1 e2 |e.Message...sX..|
|000000c0| 08 02 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 |....I..messageTy|
|000000d0| 70 65 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 |peI..sequenceIdx|
|000000e0| 70 00 00 00 00 00 00 00 00 74 00 03 64 6f 67 74 |p........t..dogt|
|000000f0| 00 03 67 6f 64                                  |..god           |
+--------+-------------------------------------------------+----------------+
20:52:12 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH

測試自定義協議的解碼

20:52:12 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 100B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 75 63 6b 01 00 00 00 00 00 00 00 00 00 e5 ff |luck............|
|00000010| ac ed 00 05 73 72 00 37 61 70 70 6c 69 63 61 74 |....sr.7applicat|
|00000020| 69 6f 6e 2e 70 72 6f 74 6f 63 6f 6c 5f 64 65 73 |ion.protocol_des|
|00000030| 69 67 6e 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 69 |ign.message.Logi|
|00000040| 6e 52 65 71 75 65 73 74 4d 65 73 73 61 67 65 ca |nRequestMessage.|
|00000050| ab b7 04 6f 14 ef a6 02 00 02 4c 00 08 70 61 73 |...o......L..pas|
|00000060| 73 77 6f 72                                     |swor            |
+--------+-------------------------------------------------+----------------+
20:52:12 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
20:52:12 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 145B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 64 74 00 12 4c 6a 61 76 61 2f 6c 61 6e 67 2f 53 |dt..Ljava/lang/S|
|00000010| 74 72 69 6e 67 3b 4c 00 08 75 73 65 72 6e 61 6d |tring;L..usernam|
|00000020| 65 71 00 7e 00 01 78 72 00 2b 61 70 70 6c 69 63 |eq.~..xr.+applic|
|00000030| 61 74 69 6f 6e 2e 70 72 6f 74 6f 63 6f 6c 5f 64 |ation.protocol_d|
|00000040| 65 73 69 67 6e 2e 6d 65 73 73 61 67 65 2e 4d 65 |esign.message.Me|
|00000050| 73 73 61 67 65 c9 91 c8 73 58 b1 e2 08 02 00 02 |ssage...sX......|
|00000060| 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 |I..messageTypeI.|
|00000070| 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 |.sequenceIdxp...|
|00000080| 00 00 00 00 00 74 00 03 64 6f 67 74 00 03 67 6f |.....t..dogt..go|
|00000090| 64                                              |d               |
+--------+-------------------------------------------------+----------------+
20:52:12 [DEBUG] [main] a.p.p.MessageCodec - 魔數:1819632491 版本:1 序列化方式:0 訊息型別:0 序列ID:長度:0 229
20:52:12 [DEBUG] [main] a.p.p.MessageCodec - LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=god, password=dog)
20:52:12 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE

Process finished with exit code 0
// 基於長度的handler解決傳輸過程中的粘包和半包問題
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,11,4,1,0));
// 自定義的協議的編碼/解碼器handler
ch.pipeline().addLast(new MessageCodec());
  • 程式碼中LengthFieldBasedFrameDecoder和MessageCodec這兩個handler形成配合,通過Netty的基於欄位的編碼器(handler)解決粘包和半包問題,所有資料經過LengthFieldBasedFrameDecoder會得到一個完整訊息的byte陣列,然後再按照MessageCodec中自定義協議解碼

2-4 handler例項共享問題

問題:channel例項中資料流的處理需要通過handler例項實現,多個channel例項能夠共享相同的handler例項?

策略:需要具體問題具體分析,基本原則是無狀態的handler例項可以共享使用,netty官方提供的handler中可以通過@Sharable註解判別該handler是否能夠被多個channel共享使用。

  • 比如LoggingHandler提供日誌輸出功能,官方原始碼中有@Sharable註解,因此該handler能夠被多個channel使用。
@Sharable
@SuppressWarnings({ "StringConcatenationInsideStringBufferAppend", "StringBufferReplaceableByString" })
public class LoggingHandler extends ChannelDuplexHandler {

    private static final LogLevel DEFAULT_LEVEL = LogLevel.DEBUG;

    protected final InternalLogger logger;
    protected final InternalLogLevel internalLevel;

    private final LogLevel level;

    /**
     * Creates a new instance whose logger name is the fully qualified class
     * name of the instance with hex dump enabled.
     */
    public LoggingHandler() {
        this(DEFAULT_LEVEL);
    }
...
  • 比如LengthFieldBasedFrameDecoder這個解碼器handler的單個例項不能夠被多個channel共享使用。
ublic class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {

    private final ByteOrder byteOrder;
    private final int maxFrameLength;
    private final int lengthFieldOffset;
    private final int lengthFieldLength;
    private final int lengthFieldEndOffset;
    private final int lengthAdjustment;
    private final int initialBytesToStrip;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private long tooLongFrameLength;
    private long bytesToDiscard;

需求:將自定義的編解碼器定義為可共享的handler供其他人使用

  • 方法:這裡沒有繼承2-3中ByteToMessageCodec,官方約定該類的子類無法加上@Sharable註解,繼承MessageToMessageCodec,這樣就能夠加上@Sharable註解
package application.protocol_design.protocol;

import application.protocol_design.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/*
    該處理的前置handler必須是LengthFieldBasedDecoder
 */
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    static byte[] magicNum = {'l','u','c','k'};
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        Byte version = 1;            // 1位元組的協議版本
        Byte serialWay = 0;          // 1位元組的序列化方式: 0表示JDK,1表示json

        // 總位元組數目 = 16(如果不是2的冪可以填充)
        out.writeBytes(magicNum);             // 4位元組的協議魔數
        out.writeByte(version);                // 1位元組的協議版本
        out.writeByte(serialWay);              // 1位元組的序列化方式: 0表示JDK,1表示json
        out.writeByte(msg.getMessageType());   //  1位元組指令型別
        out.writeInt(msg.getSequenceId());     // 4位元組序列號

        //objectOutputStream:把物件轉成位元組資料的輸出到檔案中儲存,物件的輸出過程稱為序列化,可實現物件的持久儲存
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] content = bos.toByteArray();

        out.writeInt(content.length);              // 寫入物件序列化的後的位元組陣列長度
        out.writeByte(0xff);                       //  填充字元,湊滿2的冪為16
        out.writeBytes(content);                   // 寫入物件序列化陣列

        outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serialType = in.readByte();

        byte messageType = in.readByte();
        int sequenceId = in.readInt();

        int length = in.readInt();
        byte padding = in.readByte();
        byte[] arr = new byte[length];
        in.readBytes(arr,0,length);


        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(arr));
        Message message = (Message)ois.readObject();
        out.add(message);
        log.debug("魔數:{} 版本:{} 序列化方式:{} 訊息型別:{} 序列ID:長度:{} {}",magicNum,version,serialType,messageType,sequenceId,length);
        log.debug("{}",message);
    }
}

測試程式碼中將可複用的handler例項單獨提出來

package application.protocol_design.protocol;

import application.protocol_design.message.LoginRequestMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;

import java.util.logging.Handler;

// 測試編碼/解碼的handler,這裡將可以複用的handler例項提取出來
public class TestMessageCodec_version {
    public static void main(String[] args) throws Exception {
        // 能夠服用的例項物件單獨定義
        LoggingHandler LOGIN_HANDLER = new LoggingHandler();
        MessageCodecSharable PROTOCOL = new MessageCodecSharable();
        EmbeddedChannel ch = new EmbeddedChannel();
        // 列印日誌的handler
        ch.pipeline().addLast(LOGIN_HANDLER);
        // 基於長度的handler解決傳輸過程中的粘包和半包問題
        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,11,4,1,0));
        // 自定義的協議的編碼/解碼器handler
        ch.pipeline().addLast(PROTOCOL);

        // 測試出站編碼
        System.out.println("\n測試自定義協議的編碼\n");


        LoginRequestMessage m = new LoginRequestMessage("god","dog");
        ch.writeOutbound(m);

        System.out.println("\n測試自定義協議的解碼\n");
        // 測試入站解碼
        ByteBuf bf = ByteBufAllocator.DEFAULT.buffer();
        new MessageCodec().encode(null,m,bf);
        // 這裡將一條訊息分兩次傳送,如果沒有基於長度欄位的編碼器LengthFieldBasedFrameDecoder,那麼會報錯
        ByteBuf b1 = bf.slice(0,100);
        ByteBuf b2 = bf.slice(100,bf.readableBytes()-100); b1.retain();
        ch.writeInbound(b1); ch.writeInbound(b2);
    }
}

參考資料

Netty基礎視訊教程