Netty2:粘包/拆包問題與使用LineBasedFrameDecoder的解決方案
什麽是粘包、拆包
粘包、拆包是Socket編程中最常遇見的一個問題,本文來研究一下Netty是如何解決粘包、拆包的,首先我們從什麽是粘包、拆包開始說起:
TCP是個"流"協議,所謂流,就是沒有界限的一串數據,TCP底層並不了解上層業務的具體含義,它會根據TCP緩沖區的實際情況進行包的劃分,所以在業務上:
- 一個完整的包可能會被TCP拆分為多個包進行發送(拆包)
- 多個小的包也有可能被封裝成一個大的包進行發送(粘包)
這就是所謂的TCP粘包與拆包
下圖演示了粘包、拆包的場景:
基本上有四種情況:
- Data1、Data2都分開發送到了Server端,沒有產生粘包與拆包的情況
- Data1、Data2數據粘在了一起,打成了一個大的包發送到了Server端,這種情況就是粘包
- Data1被分成Data1_1與Data1_2,Data1_1先到服務端,Data1_2與Data2再到服務端,這種情況就是拆包
- Data2被分成Data2_1與Data2_2,Data1與Data2_1先到服務端,Data2_2再到服務端,同上,這也是一種拆包的場景
粘包、拆包產生的原因
上面我們詳細了解了TCP粘包與拆包,那麽粘包與拆包為什麽會發生呢,大致上有三種原因:
- 應用程序寫入的字節大小大於Socket發送緩沖區大小
- 進行MSS大小的TCP,MSS是最大報文段長度的縮寫,是TCP報文段中的數據字段最大長度,MSS=TCP報文段長度-TCP首部長度
- 以太網的Payload大於MTU,進行IP分片,MTU是最大傳輸單元的縮寫,以太網的MTU為1500字節
粘包、拆包解決策略
由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下:
- 消息定長,例如每個報文的大小固定為200字節,如果不夠空位補空格
- 包尾增加回車換行符進行分割,例如FTP協議
- 將消息分為消息頭和消息體,消息頭中包含表示長度的字段,通常涉及思路為消息頭的第一個字段使用int32來表示消息的總長度
- 更復雜的應用層協議
未考慮TCP粘包導致功能異常演示
基於Netty的第一篇文章《Netty1:初識Netty》,TimeServer與TimeClient不變,簡單修改一下TimeServerHandler與TimeClientHandler即可以模擬出TCP粘包的情況,首先修改TimeClientHandler:
1 public class TimeClientHandler extends ChannelHandlerAdapter { 2 3 private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class); 4 5 private int counter; 6 7 private byte[] req; 8 9 public TimeClientHandler() { 10 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); 11 } 12 13 @Override 14 public void channelActive(ChannelHandlerContext ctx) throws Exception { 15 ByteBuf message = null; 16 for (int i = 0; i < 100; i++) { 17 message = Unpooled.buffer(req.length); 18 message.writeBytes(req); 19 ctx.writeAndFlush(message); 20 } 21 } 22 23 @Override 24 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 25 ByteBuf buf = (ByteBuf)msg; 26 byte[] req = new byte[buf.readableBytes()]; 27 buf.readBytes(req); 28 29 String body = new String(req, "UTF-8"); 30 System.out.println("Now is:" + body + "; the counter is:" + ++counter); 31 } 32 33 @Override 34 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 35 LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage()); 36 ctx.close(); 37 } 38 39 }
TimeClientHandler的變化是,之前是發送一次"QUERY TIME ORDER"到服務端,現在變為發送100次"QUERY TIME ORDER"+標準換行符到服務端,並在客戶端增加一個計數器,記錄從服務端收到的響應次數。
服務單TimeServerHandler也簡單改造一下,增加一個計數器記錄一下從客戶端收到的請求次數:
1 public class TimeServerHandler extends ChannelHandlerAdapter { 2 3 private int counter; 4 5 @Override 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 7 ByteBuf buf = (ByteBuf)msg; 8 byte[] req = new byte[buf.readableBytes()]; 9 buf.readBytes(req); 10 11 String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length()); 12 System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter); 13 14 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 15 currentTime = currentTime + System.getProperty("line.separator"); 16 17 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 18 ctx.writeAndFlush(resp); 19 } 20 21 @Override 22 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 23 ctx.close(); 24 } 25 26 }
按照設計,服務端應該會打印出100次"Time time server...",客戶端應當會打印出100次"Now is ...",因為客戶端向服務端發送了100次"QUERY TIME ORDER"的請求,實際運行起來呢?先看一下服務端的打印:
The time server receive order:QUERY TIME ORDER QUERY TIME ORDER ...省略,這裏有55個 QUERY TIME ORD; the counter is:1 The time server receive order: ...省略,這裏有42個 QUERY TIME ORDER; the counter is:2
counter最終等於2,表明服務端實際上只收到了2條請求,很顯然這裏發生了粘包,即多個客戶端的包合成了一個發送到了服務端,服務端每收到一個包的大小為1024字節。
接著看一下客戶端的打印:
Now is:BAD ORDER BAD ORDER ; the counter is:1
因為服務端只收到了2條消息,因此客戶端也只會收到2條消息,因為服務端兩次收到的內容都不滿足"QUERY TIME ORDER",因此返回"BAD ORDER"到客戶端,但是為什麽客戶端的counter=1呢?回過頭來仔細想想,因此服務端發送給客戶端的消息也發生了粘包。因此這裏簡單得出一個結論:粘包/拆包不僅僅發生在客戶端給服務端發送數據,服務端回數據給客戶端同樣有可能發生粘包/拆包。
上面的例子演示了粘包,拆包其實一樣的,既然可以知道服務端每收到一個包的大小為1024字節,那客戶端每次發送一個大於1024字節的數據給服務端就可以了,有興趣的朋友可以自己嘗試一下。
利用LineBasedFrameDecoder解決粘包問題
為了解決TCP粘包/拆包導致的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包,針對上面發送"QUERY TIME ORDER"+標準換行符的這種場景,簡單使用LineBasedFrameDecoder就可以解決上面發生的粘包問題。
首先對TimeServer進行改造,加入LineBasedFrameDecoder與StringDecoder:
1 public class TimeServer { 2 3 public void bind(int port) throws Exception { 4 // NIO線程組 5 EventLoopGroup bossGroup = new NioEventLoopGroup(); 6 EventLoopGroup workerGroup = new NioEventLoopGroup(); 7 8 try { 9 ServerBootstrap b = new ServerBootstrap(); 10 b.group(bossGroup, workerGroup) 11 .channel(NioServerSocketChannel.class) 12 .option(ChannelOption.SO_BACKLOG, 1024) 13 .childHandler(new ChildChannelHandler()); 14 15 // 綁定端口,同步等待成功 16 ChannelFuture f = b.bind(port).sync(); 17 // 等待服務端監聽端口關閉 18 f.channel().closeFuture().sync(); 19 } finally { 20 // 優雅退出,釋放線程池資源 21 bossGroup.shutdownGracefully(); 22 workerGroup.shutdownGracefully(); 23 } 24 } 25 26 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 27 @Override 28 protected void initChannel(SocketChannel arg0) throws Exception { 29 arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); 30 arg0.pipeline().addLast(new StringDecoder()); 31 arg0.pipeline().addLast(new TimeServerHandler()); 32 } 33 } 34 35 }
改造點就在29行、30行兩行,加入了LineBasedFrameDecoder與StringDecoder,同時TimeServerHandler也需要相應改造:
1 public class TimeServerHandler extends ChannelHandlerAdapter { 2 3 private int counter; 4 5 @Override 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 7 String body = (String)msg; 8 System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter); 9 10 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 11 currentTime = currentTime + System.getProperty("line.separator"); 12 13 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 14 ctx.writeAndFlush(resp); 15 } 16 17 @Override 18 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 19 ctx.close(); 20 } 21 22 }
改造點在第7行,由於使用了StringDecoder,因此channelRead的第二個參數msg不再是ByteBuf類型而是String類型,因此這裏只需要做一次String強轉即可。
TimeClient改造類似:
1 public class TimeClient { 2 3 public void connect(int port, String host) throws Exception { 4 EventLoopGroup group = new NioEventLoopGroup(); 5 try { 6 Bootstrap b = new Bootstrap(); 7 8 b.group(group) 9 .channel(NioSocketChannel.class) 10 .option(ChannelOption.TCP_NODELAY, true) 11 .handler(new ChannelInitializer<SocketChannel>() { 12 protected void initChannel(SocketChannel ch) throws Exception { 13 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 14 ch.pipeline().addLast(new StringDecoder()); 15 ch.pipeline().addLast(new TimeClientHandler()); 16 }; 17 }); 18 19 // 發起異步連接操作 20 ChannelFuture f = b.connect(host, port).sync(); 21 // 等待客戶端連接關閉 22 f.channel().closeFuture().sync(); 23 } finally { 24 // 優雅退出,釋放NIO線程組 25 group.shutdownGracefully(); 26 } 27 } 28 29 }
第13行、第14行這兩行加入了LineBasedFrameDecoder與StringDecoder,TimeClientHandler相應改造:
1 public class TimeClientHandler extends ChannelHandlerAdapter { 2 3 private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class); 4 5 private int counter; 6 7 private byte[] req; 8 9 public TimeClientHandler() { 10 req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes(); 11 } 12 13 @Override 14 public void channelActive(ChannelHandlerContext ctx) throws Exception { 15 ByteBuf message = null; 16 for (int i = 0; i < 100; i++) { 17 message = Unpooled.buffer(req.length); 18 message.writeBytes(req); 19 ctx.writeAndFlush(message); 20 } 21 } 22 23 @Override 24 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 25 String body = (String)msg; 26 System.out.println("Now is:" + body + "; the counter is:" + ++counter); 27 } 28 29 @Override 30 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 31 LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage()); 32 ctx.close(); 33 } 34 35 }
第25行這裏使用String進行強轉即可。接下來看一下服務端的打印:
The time server receive order:QUERY TIME ORDER; the counter is:1 The time server receive order:QUERY TIME ORDER; the counter is:2 The time server receive order:QUERY TIME ORDER; the counter is:3 The time server receive order:QUERY TIME ORDER; the counter is:4 The time server receive order:QUERY TIME ORDER; the counter is:5 ... The time server receive order:QUERY TIME ORDER; the counter is:98 The time server receive order:QUERY TIME ORDER; the counter is:99 The time server receive order:QUERY TIME ORDER; the counter is:100
看到服務端正常counter從1打印到了100,即收到了100個完整的客戶端請求,客戶端的打印如下:
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:1 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:2 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:3 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:4 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:5 ... Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:98 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:99 Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:100
看到同樣的客戶端也正常counter從1打印到了100,即收到了100個完整的服務端響應,至此,使用LineBasedFrameDecoder與StringDecoder解決了上述粘包問題。
整個LineBasedFrameDecoder的原理也比較簡單:
LineBasedFrameDecoder依次遍歷ByteBuf中的可讀字節,判斷是否有"\n"或者"\r\n",如果有就以此位置為結束位置,從可讀索引到結束位置區間的字節就組成了一行,它是以換行符為結束標誌的解碼器,支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度,如果連續讀到最大長度後仍然沒有發現換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。
StringDecoder的功能非常簡單,就是將接收到的對象轉換為字符串,然後繼續調用後面的Handler
LineBasedFrameDecoder+StringDecoder就是按行切換的文本解碼器,被設計用於支持TCP的粘包和拆包
Netty2:粘包/拆包問題與使用LineBasedFrameDecoder的解決方案