tcp粘包拆包問題
客戶端像服務端傳送訊息,服務端不知道客戶端每次傳送訊息的資料大小,服務端可能出現把一個數據包拆成兩個資料包進行讀取這種被稱為拆包,也有可能把兩個資料包當成一個數據包讀取這種被稱為粘包
如下圖所示,客戶端像服務端傳送了兩個資料包dataA和dataB,但服務端實際收到可能有四種情況
- 一次性讀到dataA+dataB,這裡就是粘包了
- 服務端讀到兩個資料包,分別為dataA_1和dataA_2+dataB,dataA先發送一部分 剩下一部分跟隨dataB一起傳送,該情況稱為拆包
- 服務端讀到兩個資料包,分別為dataA+dataB_1和dataB_2,dataA傳送的時候順帶把dataB也傳送了一部分,dataB剩下一部分單獨傳送,和2一樣也是拆包
- 服務端讀到兩個資料包,,分別為dataA和dataB 正常情況
解決辦法:自定義資料協議,讓伺服器知道客戶端每次傳送的資料大小為多少,這樣服務端就會按照客戶端定義的資料大小來讀取了
先定義訊息協議
/** * 自定義訊息協議 */ @Data @Accessors(chain = true) public class MessageProtocol { /** * 訊息的長度 */ private int length; /** * 訊息的內容 */ private byte[] content; }
定義自定義解碼器(inboundHandler)和自定義編碼器(outboundHandler)
解碼器:需要繼承ByteToMessageDecoder並重寫decode方法,解碼器是需要放在pipeline的前面,後續的自定義的邏輯inbound需放在解碼器後面,讀取資料順序是 先解碼後處理
ByteBuf: netty自己的buffer,類似於nio中的ByteBuffer,功能還是一樣的
out: 我們看它的資料介面為List<Object>,後續通過迴圈它 把資料挨個向後傳遞,如果out.add(messageProtocol) 操作兩次,那麼後續的handler就會讀取到兩次
public class MessageDecoder extends ByteToMessageDecoder { @Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 切記這裡不能用in.readableBytes() 來獲取接收的長度 這裡返回的是ByteBuf中陣列可讀的長度writerIndex - readerIndex 但不代表實際資料傳來的長度 猜想:ByteBuf初始化預設writerIndex就很大 int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); MessageProtocol messageProtocol = new MessageProtocol().setLength(length).setContent(content); out.add(messageProtocol); } }
編碼器:需要繼承MessageToByteEncoder<T>指定資料型別列舉 並重寫encode方法, 前面講過出站是從後向前執行的,所以我們需要把編碼器放到自定義handler的前面 這樣才能進行編碼
public class MessageEncoder extends MessageToByteEncoder<MessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception { out.writeInt(msg.getLength()); out.writeBytes(msg.getContent()); } }
解碼器和編碼器的新增順序,先新增解碼器 再新增編碼器 我們自定義處理的handler放在後面
.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()) .addLast(new MessageEncoder()) .addLast(new ClientChannelHandler()); } });
下面通過一個案例來演示一下
需求:分別啟動一個服務端和客戶端,客戶端連線到服務端後 像服務端傳送訊息,服務端進行讀取後回寫一個UUID給客戶端
服務端:
public class TcpServer { public static void main(String[] args) { // 用來處理連線事件 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 用來處理具體io事件 預設執行緒數為CPU核心*2 EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workGroup) // 加入事件組 .channel(NioServerSocketChannel.class) // 指定服務端通道型別 .option(ChannelOption.SO_BACKLOG, 100) // 設定執行緒佇列得到的連線數 .childOption(ChannelOption.SO_KEEPALIVE, true) // 設定保持活動連線狀態 .handler(new LoggingHandler(LogLevel.INFO)) // 給ServerSocketChannel用的 對應bossgroup .childHandler(new ChannelInitializer<SocketChannel>() { // 對應workgroup @Override protected void initChannel(SocketChannel ch) throws Exception { // 給SocketChannel新增處理器 ch.pipeline().addLast(new MessageDecoder()) .addLast(new MessageEncoder()) .addLast(new ServerChannelHandler()); } }); // 繫結埠不進行阻塞 這裡通過非同步來操作 會返回一個非同步執行結果 ChannelFuture cf = serverBootstrap.bind(8080).sync(); // 監聽關閉事件 cf.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
服務端handler:
public class ServerChannelHandler extends SimpleChannelInboundHandler<MessageProtocol> { @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { System.out.println(String.format("伺服器收到訊息,長度:%d,msg:%s", msg.getLength(), new String(msg.getContent()))); // 回寫給客戶端 UUID uuid = UUID.randomUUID(); byte[] bytes = uuid.toString().getBytes(); MessageProtocol response = new MessageProtocol().setLength(bytes.length).setContent(bytes); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客戶端:
public class TcpClient { public static void main(String[] args) { EventLoopGroup clientGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap().group(clientGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()) .addLast(new MessageEncoder()) .addLast(new ClientChannelHandler()); } }); ChannelFuture cf = bootstrap.connect("127.0.0.1", 8080).sync(); cf.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { clientGroup.shutdownGracefully(); } } }
客戶端handler:
public class ClientChannelHandler extends SimpleChannelInboundHandler<MessageProtocol> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 5 ; i++) { String msg = "你好啊,伺服器,我正在向你傳送訊息:"+i; MessageProtocol messageProtocol = new MessageProtocol().setLength(msg.getBytes().length).setContent(msg.getBytes()); ctx.writeAndFlush(messageProtocol); } } @Override protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception { System.out.println(String.format("客戶端收到訊息,長度:%d,msg:%s", msg.getLength(), new String(msg.getContent()))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服務端收到的訊息:
客戶端收到的訊息:
下面再給大家演示下不採用自定義訊息協議的情況
服務端:一共分四次接受
客戶端:分一次接受