【轉】Netty之解決TCP粘包拆包(自定義協議)
https://www.cnblogs.com/sidesky/p/6913109.html
1、什麼是粘包/拆包
一般所謂的TCP粘包是在一次接收資料不能完全地體現一個完整的訊息資料。TCP通訊為何存在粘包呢?主要原因是TCP是以流的方式來處理資料,再加上網路上MTU的往往小於在應用處理的訊息資料,所以就會引發一次接收的資料無法滿足訊息的需要,導致粘包的存在。處理粘包的唯一方法就是制定應用層的資料通訊協議,通過協議來規範現有接收的資料是否滿足訊息資料的需要。
2、解決辦法
2.1、訊息定長,報文大小固定長度,不夠空格補全,傳送和接收方遵循相同的約定,這樣即使粘包了通過接收方程式設計實現獲取定長報文也能區分。
2.2、包尾新增特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字元作為報文分隔符,接收方通過特殊分隔符切分報文區分。
2.3、將訊息分為訊息頭和訊息體,訊息頭中包含表示資訊的總長度(或者訊息體長度)的欄位
3、自定義協議,來實現TCP的粘包/拆包問題
3.0 自定義協議,開始標記
3.1 自定義協議的介紹
3.2 自定義協議的類的封裝
3.3 自定義協議的編碼器
3.4 自定義協議的解碼器
4、協議相關的實現
4.1 協議的封裝
[java] view plain copy
- import java.util.Arrays;
- /**
- * <pre>
- * 自己定義的協議
- * 資料包格式
- * +——----——+——-----——+——----——+
- * |協議開始標誌| 長度 | 資料 |
- * +——----——+——-----——+——----——+
- * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
- * 2.傳輸資料的長度contentLength,int型別
- * 3.要傳輸的資料
- * </pre>
- */
- public class SmartCarProtocol {
- /**
- * 訊息的開頭的資訊標誌
- */
- private int head_data = ConstantValue.HEAD_DATA;
- /**
- * 訊息的長度
- */
- private int contentLength;
- /**
- * 訊息的內容
- */
- private byte[] content;
- /**
- * 用於初始化,SmartCarProtocol
- *
- * @param contentLength
- * 協議裡面,訊息資料的長度
- * @param content
- * 協議裡面,訊息的資料
- */
- public SmartCarProtocol(int contentLength, byte[] content) {
- this.contentLength = contentLength;
- this.content = content;
- }
- public int getHead_data() {
- return head_data;
- }
- public int getContentLength() {
- return contentLength;
- }
- public void setContentLength(int contentLength) {
- this.contentLength = contentLength;
- }
- public byte[] getContent() {
- return content;
- }
- public void setContent(byte[] content) {
- this.content = content;
- }
- @Override
- public String toString() {
- return "SmartCarProtocol [head_data=" + head_data + ", contentLength="
- + contentLength + ", content=" + Arrays.toString(content) + "]";
- }
- }
4.2 協議的編碼器
[java] view plain copy
- /**
- * <pre>
- * 自己定義的協議
- * 資料包格式
- * +——----——+——-----——+——----——+
- * |協議開始標誌| 長度 | 資料 |
- * +——----——+——-----——+——----——+
- * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
- * 2.傳輸資料的長度contentLength,int型別
- * 3.要傳輸的資料
- * </pre>
- */
- public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> {
- @Override
- protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg,
- ByteBuf out) throws Exception {
- // 寫入訊息SmartCar的具體內容
- // 1.寫入訊息的開頭的資訊標誌(int型別)
- out.writeInt(msg.getHead_data());
- // 2.寫入訊息的長度(int 型別)
- out.writeInt(msg.getContentLength());
- // 3.寫入訊息的內容(byte[]型別)
- out.writeBytes(msg.getContent());
- }
- }
4.3 協議的解碼器
[java] view plain copy
- import java.util.List;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- /**
- * <pre>
- * 自己定義的協議
- * 資料包格式
- * +——----——+——-----——+——----——+
- * |協議開始標誌| 長度 | 資料 |
- * +——----——+——-----——+——----——+
- * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
- * 2.傳輸資料的長度contentLength,int型別
- * 3.要傳輸的資料,長度不應該超過2048,防止socket流的攻擊
- * </pre>
- */
- public class SmartCarDecoder extends ByteToMessageDecoder {
- /**
- * <pre>
- * 協議開始的標準head_data,int型別,佔據4個位元組.
- * 表示資料的長度contentLength,int型別,佔據4個位元組.
- * </pre>
- */
- public final int BASE_LENGTH = 4 + 4;
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
- List<Object> out) throws Exception {
- // 可讀長度必須大於基本長度
- if (buffer.readableBytes() >= BASE_LENGTH) {
- // 防止socket位元組流攻擊
- // 防止,客戶端傳來的資料過大
- // 因為,太大的資料,是不合理的
- if (buffer.readableBytes() > 2048) {
- buffer.skipBytes(buffer.readableBytes());
- }
- // 記錄包頭開始的index
- int beginReader;
- while (true) {
- // 獲取包頭開始的index
- beginReader = buffer.readerIndex();
- // 標記包頭開始的index
- buffer.markReaderIndex();
- // 讀到了協議的開始標誌,結束while迴圈
- if (buffer.readInt() == ConstantValue.HEAD_DATA) {
- break;
- }
- // 未讀到包頭,略過一個位元組
- // 每次略過,一個位元組,去讀取,包頭資訊的開始標記
- buffer.resetReaderIndex();
- buffer.readByte();
- // 當略過,一個位元組之後,
- // 資料包的長度,又變得不滿足
- // 此時,應該結束。等待後面的資料到達
- if (buffer.readableBytes() < BASE_LENGTH) {
- return;
- }
- }
- // 訊息的長度
- int length = buffer.readInt();
- // 判斷請求資料包資料是否到齊
- if (buffer.readableBytes() < length) {
- // 還原讀指標
- buffer.readerIndex(beginReader);
- return;
- }
- // 讀取data資料
- byte[] data = new byte[length];
- buffer.readBytes(data);
- SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);
- out.add(protocol);
- }
- }
- }
4.4 服務端加入協議的編/解碼器
4.5 客戶端加入協議的編/解碼器
5、服務端的實現
[java] view plain copy
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- public class Server {
- public Server() {
- }
- public void bind(int port) throws Exception {
- // 配置NIO執行緒組
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- // 伺服器輔助啟動類配置
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChildChannelHandler())//
- .option(ChannelOption.SO_BACKLOG, 1024) // 設定tcp緩衝區 // (5)
- .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
- // 繫結埠 同步等待繫結成功
- ChannelFuture f = b.bind(port).sync(); // (7)
- // 等到服務端監聽埠關閉
- f.channel().closeFuture().sync();
- } finally {
- // 優雅釋放執行緒資源
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- /**
- * 網路事件處理器
- */
- private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 新增自定義協議的編解碼工具
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
- // 處理網路IO
- ch.pipeline().addLast(new ServerHandler());
- }
- }
- public static void main(String[] args) throws Exception {
- new Server().bind(9999);
- }
- }
6、服務端Handler的實現
[java] view plain copy
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- public class ServerHandler extends ChannelHandlerAdapter {
- // 用於獲取客戶端傳送的資訊
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- // 用於獲取客戶端發來的資料資訊
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Server接受的客戶端的資訊 :" + body.toString());
- // 會寫資料給客戶端
- String str = "Hi I am Server ...";
- SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,
- str.getBytes());
- // 當服務端完成寫操作後,關閉與客戶端的連線
- ctx.writeAndFlush(response);
- // .addListener(ChannelFutureListener.CLOSE);
- // 當有寫操作時,不需要手動釋放msg的引用
- // 當只有讀操作時,才需要手動釋放msg的引用
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- // cause.printStackTrace();
- ctx.close();
- }
- }
7、客戶端的實現
[java] view plain copy
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- public class Client {
- /**
- * 連線伺服器
- *
- * @param port
- * @param host
- * @throws Exception
- */
- public void connect(int port, String host) throws Exception {
- // 配置客戶端NIO執行緒組
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- // 客戶端輔助啟動類 對客戶端配置
- Bootstrap b = new Bootstrap();
- b.group(group)//
- .channel(NioSocketChannel.class)//
- .option(ChannelOption.TCP_NODELAY, true)//
- .handler(new MyChannelHandler());//
- // 非同步連結伺服器 同步等待連結成功
- ChannelFuture f = b.connect(host, port).sync();
- // 等待連結關閉
- f.channel().closeFuture().sync();
- } finally {
- group.shutdownGracefully();
- System.out.println("客戶端優雅的釋放了執行緒資源...");
- }
- }
- /**
- * 網路事件處理器
- */
- private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- // 新增自定義協議的編解碼工具
- ch.pipeline().addLast(new SmartCarEncoder());
- ch.pipeline().addLast(new SmartCarDecoder());
- // 處理網路IO
- ch.pipeline().addLast(new ClientHandler());
- }
- }
- public static void main(String[] args) throws Exception {
- new Client().connect(9999, "127.0.0.1");
- }
- }
8、客戶端Handler的實現
[java] view plain copy
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
- //用於讀取客戶端發來的資訊
- public class ClientHandler extends ChannelHandlerAdapter {
- // 客戶端與服務端,連線成功的售後
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- // 傳送SmartCar協議的訊息
- // 要傳送的資訊
- String data = "I am client ...";
- // 獲得要傳送資訊的位元組陣列
- byte[] content = data.getBytes();
- // 要傳送資訊的長度
- int contentLength = content.length;
- SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);
- ctx.writeAndFlush(protocol);
- }
- // 只是讀資料,沒有寫資料的話
- // 需要自己手動的釋放的訊息
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- try {
- // 用於獲取客戶端發來的資料資訊
- SmartCarProtocol body = (SmartCarProtocol) msg;
- System.out.println("Client接受的客戶端的資訊 :" + body.toString());
- } finally {
- ReferenceCountUtil.release(msg);
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- ctx.close();
- }
- }