1. 程式人生 > 實用技巧 >Netty筆記(6) - 粘包拆包問題及解決方案

Netty筆記(6) - 粘包拆包問題及解決方案

Netty 中 TCP 粘包拆包問題

資訊通過tcp傳輸過程中出現的狀況 .

TCP是個“流”協議,所謂流,就是沒有界限的一串資料。TCP底層並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送

產生粘包和拆包問題的主要原因是,作業系統在傳送TCP資料的時候,底層會有一個緩衝區,例如1024個位元組大小,如果一次請求傳送的資料量比較小,沒達到緩衝區大小,TCP則會將多個請求合併為同一個請求進行傳送,這就形成了粘包問題;如果一次請求傳送的資料量比較大,超過了緩衝區大小,TCP就會將其拆分為多次傳送,這就是拆包,也就是將一個大的包拆分為多個小包進行傳送。

入圖所示:

上圖中演示了粘包和拆包的三種情況:

  • D1和D2兩個包都剛好滿足TCP緩衝區的大小,或者說其等待時間已經達到TCP等待時長,從而還是使用兩個獨立的包進行傳送;
  • D1和D2兩次請求間隔時間內較短,並且資料包較小,因而合併為同一個包傳送給服務端;
  • 某一個包比較大,因而將其拆分為兩個包D*_1和D*_2進行傳送,而這裡由於拆分後的某一個包比較小,其又與另一個包合併在一起傳送。

發生這種情況的程式碼:

客戶端傳送資料 快速的傳送 10條資料 :

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客戶端傳送10條資料 hello,server 編號
        for(int i= 0; i< 10; ++i) {
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " +i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);
        }
    }

}

服務端接受列印:

伺服器接收到資料 hello,server 0
伺服器接收到資料 hello,server 1
伺服器接收到資料 hello,server 2hello,server 3
伺服器接收到資料 hello,server 4hello,server 5
伺服器接收到資料 hello,server 6
伺服器接收到資料 hello,server 7hello,server 8
伺服器接收到資料 hello,server 9

很明顯 其中有三條記錄被粘在其他資料上,這就是TCP的粘包拆包現象

怎麼解決:

  1. Netty自帶的 解決方案:

    • 固定長度的拆包器 FixedLengthFrameDecoder,每個應用層資料包的都拆分成都是固定長度的大小

    • 行拆包器 LineBasedFrameDecoder,每個應用層資料包,都以換行符作為分隔符,進行分割拆分

    • 分隔符拆包器 DelimiterBasedFrameDecoder,每個應用層資料包,都通過自定義的分隔符,進行分割拆分

    • 基於資料包長度的拆包器 LengthFieldBasedFrameDecoder,將應用層資料包的長度,作為接收端應用層資料包的拆分依據。按照應用層資料包的大小,拆包。這個拆包器,有一個要求,就是應用層協議中包含資料包的長度

FixedLengthFrameDecoder 解碼器

服務端 新增 FixedLengthFrameDecoder 解碼器 並指定長度

public class EchoServer {



  public static void main(String[] args) throws InterruptedException {

      EventLoopGroup bossGroup = new NioEventLoopGroup();
      EventLoopGroup workerGroup = new NioEventLoopGroup();
      try {
          ServerBootstrap bootstrap = new ServerBootstrap();
          bootstrap.group(bossGroup, workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .option(ChannelOption.SO_BACKLOG, 1024)
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {
                          //指定長度為9 則每次擷取長度為9的位元組 
                          ch.pipeline().addLast(new FixedLengthFrameDecoder(9));
                         // 將 每次擷取的位元組編碼為字串
                          ch.pipeline().addLast(new StringDecoder());
						//自定義處理類列印
                          ch.pipeline().addLast(new EchoServerHandler());
                      }
                  });

          ChannelFuture future = bootstrap.bind(8000).sync();
          future.channel().closeFuture().sync();
      } finally {
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }
  }
}

自定義服務端Handler 列印字串:

public class EchoServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("message: " + msg.trim());
  }
}

客戶端傳送資訊 並新增字串編碼器 將資訊已字串的形式編碼:

public class EchoClient {



  public static void main(String[] args) throws InterruptedException {
      EventLoopGroup group = new NioEventLoopGroup();
      try {
          Bootstrap bootstrap = new Bootstrap();
          bootstrap.group(group)
                  .channel(NioSocketChannel.class)
                  .option(ChannelOption.TCP_NODELAY, true)
                  .handler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {
                          
                          ch.pipeline().addLast(new StringEncoder());
                          ch.pipeline().addLast(new EchoClientHandler());
                      }
                  });

          ChannelFuture future = bootstrap.connect("127.0.0.1", 8000).sync();
          future.channel().closeFuture().sync();
      } finally {
          group.shutdownGracefully();
      }
  }
}

客戶端Handler 傳送資訊 剛好長度為9 :

public class EchoClientHandler extends SimpleChannelInboundHandler<String> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("123456789");
  }
}

總結: FixedLengthFrameDecoder 解碼器 將按照指定長度擷取位元組 並新增到List中向後傳遞 , 以本案例為例,如果位元組數剛好為9,則全部列印,如果 位元組數為18, 則拆分列印兩次,如果為19 則最後一個位元組不列印,如果不足9 則什麼都不列印.

LineBasedFrameDecoder 行拆分器

通過行換行符 \n 或者 \r\n 進行分割,

將上面案例的FixedLengthFrameDecoder 解碼器 換成 LineBasedFrameDecoder

並指定 擷取每段的最大長度 (超過報錯 不往後傳遞)

...
    
.childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {

//                          ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                          ch.pipeline().addLast(new                       LineBasedFrameDecoder(5));
                          // 將前一步解碼得到的資料轉碼為字串
                          ch.pipeline().addLast(new StringDecoder());
//                           最終的資料處理
                          ch.pipeline().addLast(new EchoServerHandler());
                      }
                  });

...

客戶端Handler 傳送字串, 最後的"1234" 不會列印,,

@Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("1\n123456\r\n1234");
  }

服務端接收並列印結果 分別列印了 "1" 和 "1234" 而超過位元組長度5 的 "123456"則報出TooLongFrameException錯誤

server receives message: 1

An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.TooLongFrameException: frame length (6) exceeds the allowed maximum (5)

server receives message: 1234

DelimiterBasedFrameDecoder 自定義分割符

和行分割符類似, 此解碼器可以自定義分割符,常用構造方法:

 public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)

接收一個最大長度,和 任意個數的 分隔符(用ByteBuf的形式傳入),解碼器識別到任意一個 分割符 都會進行拆分

註冊解碼器:

傳入 "$" 和 "*" 作為分割符,並指定最大長度為 5個位元組

.childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {

                          ch.pipeline().addLast(new DelimiterBasedFrameDecoder(5,
                              Unpooled.wrappedBuffer("$".getBytes()),Unpooled.wrappedBuffer("*".getBytes())));
                          // 將前一步解碼得到的資料轉碼為字串
                          ch.pipeline().addLast(new StringDecoder());
                          
//                           最終的資料處理
                          ch.pipeline().addLast(new EchoServerHandler());
                      }
                  });

客戶端 傳送資料:

@Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
          ctx.writeAndFlush("1$123456*1234$789$");
  }

服務端只打印了 "1" 當解析到 "123456" 時 就報錯了 後面就沒有再解析了,會快取著 等到該通道關閉 或者有後續資料傳送過來時 才繼續解析

LengthFieldBasedFrameDecoder

自定義資料長度,傳送的 位元組陣列中 包含 描述 資料長度的欄位 和 資料本身,

解碼過程

常用欄位:

  • maxFrameLength:指定了每個包所能傳遞的最大資料包大小,(上圖中的最大長度為11)
  • lengthFieldOffset:指定了長度欄位在位元組碼中的偏移量;(11這個描述長度的資料是在陣列的第幾位開始)
  • lengthFieldLength:指定了長度欄位所佔用的位元組長度;(11 佔 1個位元組)
  • lengthAdjustment: 長度域的偏移量矯正。 如果長度域的值,除了包含有效資料域的長度外,還包含了其他域(如長度域自身)長度,那麼,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。 ( 11 這個域 不光光描述 Hello,world, 一般設定為0,)
  • initialBytesToStrip : 丟棄的起始位元組數。丟棄處於有效資料前面的位元組數量。比如前面有1個節點的長度域,則它的值為1. ( 如果為0代表不丟棄,則將長度域也向後傳遞)

服務端新增 解碼器:

  • 最大長度 為 長度描述域 的值11 + 長度描述域本身佔用的長度 1 = 12
  • 長度描述域放在資料包的第一位, 沒有偏移 為0
  • 長度描述域 長度為1
  • 無需矯正
  • 一個位元組也不丟棄
.childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
            // 這裡將FixedLengthFrameDecoder新增到pipeline中,指定長度為20
            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(12,0,1,0,0));
            // 將前一步解碼得到的資料轉碼為字串

            ch.pipeline().addLast(new StringDecoder());
            // 最終的資料處理
            ch.pipeline().addLast(new EchoServerHandler());
          }
        });

客戶端傳送資料 傳送最Netty 底層操作 的ByteBuf物件 傳送時 無需任何編碼:

 @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ByteBuf buffer = Unpooled.buffer();
    buffer.writeByte(11);
    buffer.writeBytes("Hello,World".getBytes());
    ctx.writeAndFlush(buffer);
  }

服務端接收資料為 (11代表的製表符)Hello,World

這樣傳送 每次都要計算 資料長度,並手動新增到 資料的前面,很不方便 配合LengthFieldPrepender 使用,這個編碼碼器可以計算 長度,並自動新增到 資料的前面

改造客戶端 先攔截資料按字串編碼,再計算位元組長度 新增 長度描述欄位 並佔用一個位元組 (這個長度要與客戶端的解碼器 lengthFieldLength引數 值保持一致) :

 .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {

            ch.pipeline().addLast(new LengthFieldPrepender(1));
            ch.pipeline().addLast(new StringEncoder());
            // 客戶端傳送訊息給服務端,並且處理服務端響應的訊息
            ch.pipeline().addLast(new EchoClientHandler());
          }
        });

客戶端傳送 有字串編碼器 可以直接傳送字串:

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush("Hello,World");
  }

自定義協議

上面介紹的 各種解碼器 已經可以應付絕大多數場景, 如果遇到 特殊的狀況 我們也可以自定義協議

定義 協議物件:

//協議包
public class MessageProtocol {
    private int len; //關鍵
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}

客戶端傳送:

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
       
        for(int i = 0; i< 5; i++) {
            String mes = "Hello,World";
            byte[] content = mes.getBytes(Charset.forName("utf-8"));
            int length = mes.getBytes(Charset.forName("utf-8")).length;

            //建立協議包物件
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);

        }
    }

該協議的 自定義 編碼器 將協議包傳送出去:

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被呼叫");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

將客戶端 傳送資料的Handler 和 編碼器 註冊 這裡就不寫了

服務端解碼器 讀取長度 並 判斷可讀資料的長度是否足夠 :

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        
        in.markReaderIndex();
        
        //讀取長度
        int length = in.readInt();
        //如果可讀長度大於 資料長度 說明資料完整
        if (in.readableBytes()>length){
            byte[] content = new byte[length];
            in.readBytes(content);
            //封裝成 MessageProtocol 物件,放入 out, 傳遞下一個handler業務處理
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            out.add(messageProtocol);
        }else{
            //如果資料不夠長 將已經讀過的的int 資料還原回去 留下次讀取
            in.resetReaderIndex();
        }
    }
}

服務端成功讀取:

本例中存在很多問題, 明白這個意思就行, 感興趣的話 可以 自己動手優化