1. 程式人生 > 其它 >網路通訊中的粘包問題

網路通訊中的粘包問題

什麼是粘包?

在網路傳輸層有TCP和UDP兩種協議;

如果使用TCP進行通訊,則在大多數場景下是不存在丟包和包亂序問題的,因為TCP通訊是可靠的通訊方式,TCP棧通過序列號和包重傳確認機制保證資料包的有序和一定被正確傳送到目的地;

如果使用UDP進行通訊,且不允許少量丟包,就要自己在UDP的基礎上實現類似TCP這種有序和可靠的傳輸機制(如:RTP、RUDP);

網路通訊時,除了上面的丟包,亂序問題,還有一種粘包的問題;

粘包就是連續向對端傳送兩個或者兩個以上的資料包,對端在一次收取中收到的資料包數量可能大於1個,當大於1個時,可能是幾個(包括一個)包加上某個包的部分,或者乾脆幾個完整的包在一起;也可能存在收到的資料只是一個包的部分,這種情況一般也叫作半包;

如下圖所示

下面以TCP為例子說明;

TCP中的粘包/拆包

TCP是一個流協議,所謂流就是沒有界限的一串資料;TCP的底層並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為一個完整的包可能被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的TCP的粘包和拆包問題;

粘包的例子

客戶端

檢視程式碼
public class NettyDumpSendClient {

    private final int serverPort;
    private final String serverIp;
    Bootstrap b = new Bootstrap();
    private final static Logger logger = LoggerFactory.getLogger(NettyDumpSendClient.class);


    public NettyDumpSendClient(String ip, int port) {
        this.serverPort = port;
        this.serverIp = ip;
    }

    public void runClient() {
        //建立reactor 執行緒組
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設定reactor 執行緒組
            b.group(workerLoopGroup);
            //2 設定nio型別的channel
            b.channel(NioSocketChannel.class);
            //3 設定監聽埠
            b.remoteAddress(serverIp, serverPort);
            //4 設定通道的引數
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.option(ChannelOption.TCP_NODELAY, Boolean.TRUE);

            //5 裝配子通道流水線
            b.handler(new ChannelInitializer<SocketChannel>() {
                //有連線到達時會建立一個channel
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水線新增一個handler處理器
                    ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
                }
            });
            ChannelFuture f = b.connect();
            f.addListener((ChannelFuture futureListener) ->
            {
                if (futureListener.isSuccess()) {
                    logger.info("EchoClient客戶端連線成功!");

                } else {
                    logger.info("EchoClient客戶端連線失敗!");
                }

            });

            // 阻塞,直到連線完成
            f.sync();
            Channel channel = f.channel();

            //6傳送大量的文字
            byte[] bytes = "傳送Echo Client.".getBytes(StandardCharsets.UTF_8);
            for (int i = 0; i < 1000; i++) {
                //傳送ByteBuf
                ByteBuf buffer = channel.alloc().buffer();
                buffer.writeBytes(bytes);
                channel.writeAndFlush(buffer);
            }


            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture =channel.closeFuture();
            closeFuture.sync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括建立的執行緒
            workerLoopGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port = 11111;
        String ip = "127.0.0.1";
        new NettyDumpSendClient(ip, port).runClient();
    }
}
檢視程式碼
@ChannelHandler.Sharable
public class NettyEchoClientHandler extends ChannelInboundHandlerAdapter {

    private final static Logger logger = LoggerFactory.getLogger(NettyEchoClientHandler.class);
    public static final NettyEchoClientHandler INSTANCE = new NettyEchoClientHandler();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        int len = in.readableBytes();
        byte[] arr = new byte[len];
        in.getBytes(0, arr);
        logger.info("client received: {}", new String(arr, StandardCharsets.UTF_8));
        in.release();

    }
}

服務端

檢視程式碼
public class NettyEchoServer {

    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();
    private final static Logger logger = LoggerFactory.getLogger(NettyEchoServer.class);

    public NettyEchoServer(int port) {
        this.serverPort = port;
    }

    public void runServer() {
        //建立reactor 執行緒組
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 設定reactor 執行緒組
            b.group(bossLoopGroup, workerLoopGroup);
            //2 設定nio型別的channel
            b.channel(NioServerSocketChannel.class);
            //3 設定監聽埠
            b.localAddress(serverPort);
            //4 設定通道的引數
            b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.SO_KEEPALIVE, true);

            //5 裝配子通道流水線
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有連線到達時會建立一個channel
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
                }
            });
            // 6 開始繫結server
            // 通過呼叫sync同步方法阻塞直到繫結成功
            ChannelFuture channelFuture = b.bind();
            channelFuture.addListener((future)->{
                if(future.isSuccess())
                {
                    logger.info(" ========》反應器執行緒 回撥 伺服器啟動成功,監聽埠: " +
                            channelFuture.channel().localAddress());

                }
            });

            // 7 等待通道關閉的非同步任務結束
            // 服務監聽通道會一直等待通道關閉的非同步任務結束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 優雅關閉EventLoopGroup,
            // 釋放掉所有資源包括建立的執行緒
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        int port = 11111;
        new NettyEchoServer(port).runServer();
    }
}
檢視程式碼
public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter {
    public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();
    private final static Logger logger = LoggerFactory.getLogger(NettyEchoServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf in = (ByteBuf) msg;

        logger.info("msg type: {}", (in.hasArray()?"堆記憶體":"直接記憶體"));

        int len = in.readableBytes();
        byte[] arr = new byte[len];
        in.getBytes(0, arr);
        logger.info("server received: {}", new String(arr, "UTF-8"));

        //寫回資料,非同步任務
        logger.info("寫回前,msg.refCnt:{}", (in.refCnt()));

        ChannelFuture f = ctx.writeAndFlush(msg);
        f.addListener((ChannelFuture futureListener) -> {
            logger.info("寫回後,msg.refCnt:{}", in.refCnt());
        });
    }
}

對於服務端可能會出現三種類型的輸出:

  • 一個完整的客戶端傳送的ByteBuf資料;
  • 讀到多個客戶端傳送的ByteBuf資料,但是資料是粘在一起;
  • 讀到客戶端傳送的部分ByteBuf資料的內容,並且有亂碼;

TCP粘包/拆包問題

假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,故可能存在以下4種情況;

  • 服務端分兩次讀取到了兩個獨立的資料包,分別是D1和D2,沒有粘包和拆包;
  • 服務端一次接收到了兩個資料包,D1和D2粘合在一起,這種為TCP粘包;
  • 服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩餘內容,這種為TCP拆包;
  • 服務端分兩次讀取到了兩個資料包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包剩餘的內容D1_2和D2的整包;

對於上面示例的服務端輸出,第一種情況,接收到完整的ByteBuf資料,這種為全包;第二種為粘包;第三種不是完整的資料包,這種為半包;

TCP粘包/拆包發生的原因

TCP報文封裝封裝

應用程式資料在傳送到物理網路之前,將沿著協議棧從上往下依次傳遞,每層協議都將在上層資料的基礎上加上自己的頭部資訊(有時還包括尾部資訊),以實現該層的功能,這個過程稱為封裝;如下圖;

經過TCP封裝後的資料稱為TCP報文段或TCP段,這部分資料的TCP頭部資訊和TCP核心緩衝區(傳送緩衝區或接收快取區)資料一起構成了TCP報文段;如下圖;

當傳送端應用程式使用send或write函式向一個TCP連線寫入資料時,核心中TCP模組首先把這些資料複製到與該連線對應的TCP核心傳送緩衝區中,然後TCP模組呼叫IP模組提供的服務,傳遞的引數包括TCP頭部資訊和TCP傳送緩衝區中的資料,即TCP的報文段

資料報的限制

經過IP封裝後的資料稱為IP資料報;IP資料報也包括頭部資訊和資料部分,其中資料部分就是一個TCP報文段,UDP資料報或ICMP報文;

經過資料鏈路層封裝的資料稱為幀,傳輸媒介不同,幀的型別也不同,如在乙太網上傳輸的是乙太網幀;幀的最大傳輸單元(Max Transmit Unit,MTU),即幀最多能攜帶多少上層協議資料(如:IP資料報),通常受到網路型別的限制;

乙太網幀的MTU是1500位元組當IP資料報的長度超過幀的MTU時,它將被分片傳輸

乙太網幀的MTU是1500位元組(可以通過ifconfig命令或netstat命令檢視),因此他攜帶的IP資料報的資料部分最多是1480位元組(IPv4頭部佔20位元組);

TCP連線初始化時,通訊雙方使用該選項來協商最大報文段長度(Max Segement Size,MSS);TCP模組通常將MSS設定為(MTU - 40)位元組(減掉的這40位元組包括20位元組的TCP頭部和20位元組的IPv4的頭部,IPv4為例),這樣攜帶TCP報文段的資料報長度就不會超過MTU(假設TCP頭部和IP頭部都不包括選項欄位,這是一般情況),從而避免了發生了IP分片,對於乙太網而言,MSS的值是1460(1500 - 40)位元組;

參考:https://en.wikipedia.org/wiki/IPv4

   https://en.wikipedia.org/wiki/IPv6

   https://docs.oracle.com/cd/E19253-01/819-7058/ipv6-ref-2/index.html

   https://en.wikipedia.org/wiki/Maximum_segment_size

假設用IP資料報封裝一個長度為1481位元組的ICMP報文(包括8位元組的ICMP頭部,所以其資料部分的長度為1473位元組),則該資料報在使用乙太網幀傳輸時必須被分片;如下圖:

長度為1501位元組的IP資料報被拆分成兩個IP分片,第一個IP分片長度為1500位元組,第二個IP分片的長度為21位元組;每個IP分片都包含自己的IP頭部(20位元組),且第一個IP分片的IP頭部設定了MF標誌,而第二個IP分片的IP頭部則沒有設定改標誌,因為它已經是最後一個分片了;原始IP資料報中的ICMP頭部內容被完整地複製到第一個IP分片中,第二個IP分片不包含ICMP頭部資訊,因為IP模組重組該ICMP報文的時候只需要一份ICMP頭部資訊,重複傳送這個資訊沒有任何益處;1473位元組的ICMP報文資料的前1472位元組被IP模組複製到第一個IP分片中,使其總長度為1500位元組,從而滿足MTU的要求,而多出的最後1位元組則被複制到第二個IP分片中;

TCP粘包/拆包產生

TCP粘包/拆包產生的原因如下:

  • 應用程式write寫入的位元組大小大於套接字傳送緩衝區大小(SO_SNDBUF);
  • 進行MSS大小的TCP分段;
  • 乙太網幀的payload(乙太網資料鏈路層的有效資料)大於MTU,從而進行IP分片;

參考:https://man7.org/linux/man-pages/man7/socket.7.html

粘包問題的解決方式

  • 固定包長的資料包,固定包長,即每個協議包的長度都是固定的;

假如使用者規定每個協議包的大小都是64位元組,每收滿64位元組,就取出來解析(如果不夠,就先存起來),則這種通訊協議的格式簡單但靈活性差;如果包的內容長度小於指定的位元組數,對剩餘的空間就需要填充特殊的資訊,例如"\0"(如果不填充特殊的內容,那麼如何區分包裡面的正常內容與填充資訊呢);如果包的內容超過指定的位元組數,又得分包分片,則需要增加額外的處理邏輯,在傳送端進行分包分片,在接收端重新組裝包片;

  • 以指定的字元(串)為包的結束標誌;

這種協議包比較常見,即在位元組流中遇到特殊的符號值時就認為到一個包的末尾;例如 FTP或SMTP,在一個命令或者一段資料後面加上"\r\n"(即 CRLF)表示一個包的結束;對端收到資料後,每遇到一個"\r\n",就把之前的資料當作一個數據包;這種協議一般用於一些包含各種命令控制的應用中,其不足之處就是如果協議資料包的內容部分需要使用包結束標誌字元,就需要對這些字元做轉碼或者轉義操作,以免被接收方錯誤地當成包結束標誌而誤解析;

  • 包頭+包體格式;

這種格式的包一般分為兩部分,即包頭和包體,包頭是固定大小的,且包頭必須包含一個欄位來說明接下來的包體有多大;