Netty筆記(6) - 粘包拆包問題及解決方案
Netty 中 TCP 粘包拆包問題
資訊通過tcp傳輸過程中出現的狀況 .
TCP是個“流”協議,所謂流,就是沒有界限的一串資料。TCP底層並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送
產生粘包和拆包問題的主要原因是,作業系統在傳送TCP資料的時候,底層會有一個緩衝區,例如1024個位元組大小,如果一次請求傳送的資料量比較小,沒達到緩衝區大小,TCP則會將多個請求合併為同一個請求進行傳送,這就形成了粘包問題;如果一次請求傳送的資料量比較大,超過了緩衝區大小,TCP就會將其拆分為多次傳送,這就是拆包,也就是將一個大的包拆分為多個小包進行傳送。
入圖所示:
上圖中演示了粘包和拆包的三種情況:
- D1和D2兩個包都剛好滿足TCP緩衝區的大小,或者說其等待時間已經達到TCP等待時長,從而還是使用兩個獨立的包進行傳送;
- D1和D2兩次請求間隔時間內較短,並且資料包較小,因而合併為同一個包傳送給服務端;
- 某一個包比較大,因而將其拆分為兩個包D*_1和D*_2進行傳送,而這裡由於拆分後的某一個包比較小,其又與另一個包合併在一起傳送。
發生這種情況的程式碼:
客戶端傳送資料 快速的傳送 10條資料 :
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //使用客戶端傳送10條資料 hello,server 編號 for(int i= 0; i< 10; ++i) { ByteBuf buffer = Unpooled.copiedBuffer("hello,server " +i, Charset.forName("utf-8")); ctx.writeAndFlush(buffer); } } }
服務端接受列印:
伺服器接收到資料 hello,server 0
伺服器接收到資料 hello,server 1
伺服器接收到資料 hello,server 2hello,server 3
伺服器接收到資料 hello,server 4hello,server 5
伺服器接收到資料 hello,server 6
伺服器接收到資料 hello,server 7hello,server 8
伺服器接收到資料 hello,server 9
很明顯 其中有三條記錄被粘在其他資料上,這就是TCP的粘包拆包現象
怎麼解決:
-
Netty自帶的 解決方案:
-
固定長度的拆包器 FixedLengthFrameDecoder,每個應用層資料包的都拆分成都是固定長度的大小
-
行拆包器 LineBasedFrameDecoder,每個應用層資料包,都以換行符作為分隔符,進行分割拆分
-
分隔符拆包器 DelimiterBasedFrameDecoder,每個應用層資料包,都通過自定義的分隔符,進行分割拆分
-
基於資料包長度的拆包器 LengthFieldBasedFrameDecoder,將應用層資料包的長度,作為接收端應用層資料包的拆分依據。按照應用層資料包的大小,拆包。這個拆包器,有一個要求,就是應用層協議中包含資料包的長度
-
FixedLengthFrameDecoder 解碼器
服務端 新增 FixedLengthFrameDecoder 解碼器 並指定長度
public class EchoServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//指定長度為9 則每次擷取長度為9的位元組
ch.pipeline().addLast(new FixedLengthFrameDecoder(9));
// 將 每次擷取的位元組編碼為字串
ch.pipeline().addLast(new StringDecoder());
//自定義處理類列印
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(8000).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
自定義服務端Handler 列印字串:
public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("message: " + msg.trim());
}
}
客戶端傳送資訊 並新增字串編碼器 將資訊已字串的形式編碼:
public class EchoClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 8000).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客戶端Handler 傳送資訊 剛好長度為9 :
public class EchoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("123456789");
}
}
總結: FixedLengthFrameDecoder 解碼器 將按照指定長度擷取位元組 並新增到List中向後傳遞 , 以本案例為例,如果位元組數剛好為9,則全部列印,如果 位元組數為18, 則拆分列印兩次,如果為19 則最後一個位元組不列印,如果不足9 則什麼都不列印.
LineBasedFrameDecoder 行拆分器
通過行換行符 \n 或者 \r\n 進行分割,
將上面案例的FixedLengthFrameDecoder 解碼器 換成 LineBasedFrameDecoder
並指定 擷取每段的最大長度 (超過報錯 不往後傳遞)
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new LineBasedFrameDecoder(5));
// 將前一步解碼得到的資料轉碼為字串
ch.pipeline().addLast(new StringDecoder());
// 最終的資料處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
...
客戶端Handler 傳送字串, 最後的"1234" 不會列印,,
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("1\n123456\r\n1234");
}
服務端接收並列印結果 分別列印了 "1" 和 "1234" 而超過位元組長度5 的 "123456"則報出TooLongFrameException錯誤
server receives message: 1
An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.TooLongFrameException: frame length (6) exceeds the allowed maximum (5)
server receives message: 1234
DelimiterBasedFrameDecoder 自定義分割符
和行分割符類似, 此解碼器可以自定義分割符,常用構造方法:
public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
接收一個最大長度,和 任意個數的 分隔符(用ByteBuf的形式傳入),解碼器識別到任意一個 分割符 都會進行拆分
註冊解碼器:
傳入 "$" 和 "*" 作為分割符,並指定最大長度為 5個位元組
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(5,
Unpooled.wrappedBuffer("$".getBytes()),Unpooled.wrappedBuffer("*".getBytes())));
// 將前一步解碼得到的資料轉碼為字串
ch.pipeline().addLast(new StringDecoder());
// 最終的資料處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
客戶端 傳送資料:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("1$123456*1234$789$");
}
服務端只打印了 "1" 當解析到 "123456" 時 就報錯了 後面就沒有再解析了,會快取著 等到該通道關閉 或者有後續資料傳送過來時 才繼續解析
LengthFieldBasedFrameDecoder
自定義資料長度,傳送的 位元組陣列中 包含 描述 資料長度的欄位 和 資料本身,
解碼過程
常用欄位:
- maxFrameLength:指定了每個包所能傳遞的最大資料包大小,(上圖中的最大長度為11)
- lengthFieldOffset:指定了長度欄位在位元組碼中的偏移量;(11這個描述長度的資料是在陣列的第幾位開始)
- lengthFieldLength:指定了長度欄位所佔用的位元組長度;(11 佔 1個位元組)
- lengthAdjustment: 長度域的偏移量矯正。 如果長度域的值,除了包含有效資料域的長度外,還包含了其他域(如長度域自身)長度,那麼,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長。 ( 11 這個域 不光光描述 Hello,world, 一般設定為0,)
- initialBytesToStrip : 丟棄的起始位元組數。丟棄處於有效資料前面的位元組數量。比如前面有1個節點的長度域,則它的值為1. ( 如果為0代表不丟棄,則將長度域也向後傳遞)
服務端新增 解碼器:
- 最大長度 為 長度描述域 的值11 + 長度描述域本身佔用的長度 1 = 12
- 長度描述域放在資料包的第一位, 沒有偏移 為0
- 長度描述域 長度為1
- 無需矯正
- 一個位元組也不丟棄
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 這裡將FixedLengthFrameDecoder新增到pipeline中,指定長度為20
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(12,0,1,0,0));
// 將前一步解碼得到的資料轉碼為字串
ch.pipeline().addLast(new StringDecoder());
// 最終的資料處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
客戶端傳送資料 傳送最Netty 底層操作 的ByteBuf物件 傳送時 無需任何編碼:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(11);
buffer.writeBytes("Hello,World".getBytes());
ctx.writeAndFlush(buffer);
}
服務端接收資料為 (11代表的製表符)Hello,World
這樣傳送 每次都要計算 資料長度,並手動新增到 資料的前面,很不方便 配合LengthFieldPrepender
使用,這個編碼碼器可以計算 長度,並自動新增到 資料的前面
改造客戶端 先攔截資料按字串編碼,再計算位元組長度 新增 長度描述欄位 並佔用一個位元組 (這個長度要與客戶端的解碼器 lengthFieldLength
引數 值保持一致) :
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(1));
ch.pipeline().addLast(new StringEncoder());
// 客戶端傳送訊息給服務端,並且處理服務端響應的訊息
ch.pipeline().addLast(new EchoClientHandler());
}
});
客戶端傳送 有字串編碼器 可以直接傳送字串:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("Hello,World");
}
自定義協議
上面介紹的 各種解碼器 已經可以應付絕大多數場景, 如果遇到 特殊的狀況 我們也可以自定義協議
定義 協議物件:
//協議包
public class MessageProtocol {
private int len; //關鍵
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
客戶端傳送:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0; i< 5; i++) {
String mes = "Hello,World";
byte[] content = mes.getBytes(Charset.forName("utf-8"));
int length = mes.getBytes(Charset.forName("utf-8")).length;
//建立協議包物件
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
該協議的 自定義 編碼器 將協議包傳送出去:
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被呼叫");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
將客戶端 傳送資料的Handler 和 編碼器 註冊 這裡就不寫了
服務端解碼器 讀取長度 並 判斷可讀資料的長度是否足夠 :
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
//讀取長度
int length = in.readInt();
//如果可讀長度大於 資料長度 說明資料完整
if (in.readableBytes()>length){
byte[] content = new byte[length];
in.readBytes(content);
//封裝成 MessageProtocol 物件,放入 out, 傳遞下一個handler業務處理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}else{
//如果資料不夠長 將已經讀過的的int 資料還原回去 留下次讀取
in.resetReaderIndex();
}
}
}
服務端成功讀取:
本例中存在很多問題, 明白這個意思就行, 感興趣的話 可以 自己動手優化