1. 程式人生 > >tcp黏包

tcp黏包

轉載https://www.cnblogs.com/wade-luffy/p/6165671.html

 

無論是服務端還是客戶端,當我們讀取或者傳送訊息的時候,都需要考慮TCP底層的粘包/拆包機制。

回到頂部

TCP粘包/拆包

TCP是個“流”協議,所謂流,就是沒有界限的一串資料。大家可以想想河裡的流水,是連成一片的,其間並沒有分界線。TCP底層並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的TCP粘包和拆包問題。

TCP粘包/拆包問題說明

假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,故可能存在以下4種情況。

(1)服務端分兩次讀取到了兩個獨立的資料包,分別是D1和D2,沒有粘包和拆包;

(2)服務端一次接收到了兩個資料包,D1和D2粘合在一起,被稱為TCP粘包;

(3)服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩餘內容,這被稱為TCP拆包;

(4)服務端分兩次讀取到了兩個資料包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩餘內容D1_2和D2包的整包。

如果此時服務端TCP接收滑窗非常小,而資料包D1和D2比較大,很有可能會發生第五種可能,即服務端分多次才能將D1和D2包接收完全,期間發生多次拆包。

TCP粘包/拆包發生的原因

問題產生的原因有三個,分別如下。

(1)應用程式write寫入的位元組大小大於套介面傳送緩衝區大小;

(2)進行MSS大小的TCP分段;

(3)乙太網幀的payload大於MTU進行IP分片。

 

粘包問題的解決策略

由於底層的TCP無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下。

(1)訊息定長,例如每個報文的大小為固定長度200位元組,如果不夠,空位補空格;

(2)在包尾增加回車換行符進行分割,例如FTP協議;

(3)將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度(或者訊息體長度)的欄位,通常設計思路為訊息頭的第一個欄位使用int32來表示訊息的總長度;

(4)更復雜的應用層協議。

回到頂部

未考慮TCP粘包導致功能異常案例 

在前面的時間伺服器例程中,我們多次強調並沒有考慮讀半包問題,這在功能測試時往往沒有問題,但是一旦壓力上來,或者傳送大報文之後,就會存在粘包/拆包問題。如果程式碼沒有考慮,往往就會出現解碼錯位或者錯誤,導致程式不能正常工作。以Netty 入門示例為例。

TimeServer的改造

複製程式碼
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;


public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
        System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}
複製程式碼

每讀到一條訊息後,就計一次數,然後傳送應答訊息給客戶端。按照設計,服務端接收到的訊息總數應該跟客戶端傳送的訊息總數相同,而且請求訊息刪除回車換行符後應該為"QUERY TIME ORDER"。

TimeClient的改造 

複製程式碼
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeClientHandler extends ChannelHandlerAdapter {

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 釋放資源
        ctx.close();
    }
}
複製程式碼

客戶端跟服務端鏈路建立成功之後,迴圈傳送100條訊息,每傳送一條就重新整理一次,保證每條訊息都會被寫入Channel中。按照我們的設計,服務端應該接收到100條查詢時間指令的請求訊息。客戶端每接收到服務端一條應答訊息之後,就列印一次計數器。按照設計初衷,客戶端應該列印100次服務端的系統時間。

執行結果:

服務端執行結果如下。

The time server receive order : QUERY TIME ORDER

QUERY TIME ORDER

......................

QUERY TIME ORDER ; the counter is : 1

The time server receive order :

QUERY TIME ORDER

............

QUERY TIME ORDER ; the counter is : 2

服務端執行結果表明它只接收到了兩條訊息,第一條包含57條“QUERY TIME ORDER”指令,第二條包含了43條“QUERY TIME ORDER”指令,總數正好是100條。我們期待的是收到100條訊息,每條包含一條“QUERY TIME ORDER”指令。這說明發生了TCP粘包。

客戶端執行結果如下。

Now is : BAD ORDER

BAD ORDER

; the counter is : 1

按照設計初衷,客戶端應該收到100條當前系統時間的訊息,但實際上只收到了一條。這不難理解,因為服務端只收到了2條請求訊息,所以實際服務端只發送了2條應答,由於請求訊息不滿足查詢條件,所以返回了2條“BAD ORDER”應答訊息。但是實際上客戶端只收到了一條包含2條“BAD ORDER”指令的訊息,說明服務端返回的應答訊息也發生了粘包。由於上面的例程沒有考慮TCP的粘包/拆包,所以當發生TCP粘包時,我們的程式就不能正常工作。

回到頂部

利用LineBasedFrameDecoder解決TCP粘包問題

為了解決TCP粘包/拆包導致的半包讀寫問題,Netty預設提供了多種編解碼器用於處理半包,只要能熟練掌握這些類庫的使用,TCP粘包問題從此會變得非常容易,你甚至不需要關心它們,這也是其他NIO框架和JDK原生的NIO API所無法匹敵的。

服務端程式碼:

複製程式碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class TimeServer {

    public void bind(int port) throws Exception {
     // 配置服務端的NIO執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            // 繫結埠,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放執行緒池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer {
        @Override
        protected void initChannel(Channel arg0) throws Exception {
            arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用預設值
            }
        }
        new TimeServer().bind(port);
    }
}

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;


public class TimeServerHandler extends ChannelHandlerAdapter {

    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        System.out.println("The time server receive order : " + body  + " ; the counter is : " + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ?
                new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}
複製程式碼

客戶端程式碼:

複製程式碼
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;


public class TimeClient {

    public void connect(int port, String host) throws Exception {
// 配置客戶端NIO執行緒組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer() {
                        @Override
                        public void initChannel(Channel ch)
                                throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            // 發起非同步連線操作
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用預設值
            }
        }
        new TimeClient().connect(port, "127.0.0.1");
    }
}


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;


public class TimeClientHandler extends ChannelHandlerAdapter {

    private int counter;

    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator"))
                .getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        System.out.println("Now is : " + body + " ; the counter is : "  + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 釋放資源
        ctx.close();
    }
}
複製程式碼

兩個變化:

  1. 拿到的msg已經是解碼成字串之後的應答訊息
  2. 新增了兩個解碼器:第一個是LineBasedFrameDecoder,第二個是StringDecoder。

執行結果:

服務端執行結果如下。

The time server receive order : QUERY TIME ORDER ; the counter is : 1

.....................................

The time server receive order : QUERY TIME ORDER ; the counter is : 100

客戶端執行結果如下。

Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 1

......................................

Now is : Thu Feb 20 00:00:14 CST 2014 ; the counter is : 100

程式的執行結果完全符合預期,說明通過使用LineBasedFrameDecoder和StringDecoder成功解決了TCP粘包導致的讀半包問題。對於使用者來說,只要將支援半包解碼的handler新增到ChannelPipeline中即可,不需要寫額外的程式碼,使用者使用起來非常簡單。

LineBasedFrameDecoder和StringDecoder的原理分析

LineBasedFrameDecoder的工作原理是它依次遍歷ByteBuf中的可讀位元組,判斷看是否有“\n”或者“\r\n”,如果有,就以此位置為結束位置,從可讀索引到結束位置區間的位元組就組成了一行。它是以換行符為結束標誌的解碼器,支援攜帶結束符或者不攜帶結束符兩種解碼方式,同時支援配置單行的最大長度。如果連續讀取到最大長度後仍然沒有發現換行符,就會丟擲異常,同時忽略掉之前讀到的異常碼流。

StringDecoder的功能非常簡單,就是將接收到的物件轉換成字串,然後繼續呼叫後面的handler。LineBasedFrameDecoder + StringDecoder組合就是按行切換的文字解碼器,它被設計用來支援TCP的粘包和拆包。

如果傳送的訊息不是以換行符結束的該怎麼辦呢?或者沒有回車換行符,靠訊息頭中的長度欄位來分包怎麼辦?是不是需要自己寫半包解碼器?答案是否定的,Netty提供了多種支援TCP粘包/拆包的解碼器,用來滿足使用者的不同訴求。