Netty中的拆包粘包問題
TCP傳輸協議是基於資料流傳輸的,而基於流化的資料是沒有界限的,當客戶端向服務端傳送資料時,可能會把一個完整的資料報文拆分成多個小報文進行傳送,也可能將多個報文合併成一個大報文進行傳送。
在這樣的情況下,有可能會出現圖3-1所示的情況。
- 服務端恰巧讀到了兩個完整的資料包A和B,沒有出現拆包/粘包問題;
- 服務端接收到A和B粘在一起的資料包,服務端需要解析出A和B;
- 服務端收到完整的A和B的一部分資料包B-1,服務端需要解析出完整的A,並等待讀取完整的B資料包;
- 服務端接收到A的一部分資料包A-1,此時需要等待接收到完整的A資料包;
- 資料包A較大,服務端需要多次才可以接收完資料包A。
由於存在拆包/粘包問題,接收方很難界定資料包的邊界在哪裡,所以可能會讀取到不完整的資料導致資料解析出現問題。
應用層定義通訊協議
如何解決拆包和粘包問題呢?
一般我們會在應用層定義通訊協議。其實思想也很簡單,就是通訊雙方約定一個通訊報文協議,服務端收到報文之後,按照約定的協議進行解碼,從而避免出現粘包和拆包問題。
其實把這個問題往深度思考一下就不難發現,之所以在拆包粘包之後導致收到訊息端的內容解析出現錯誤,是因為程式無法識別一個完整訊息,也就是不知道如何把拆包之後的訊息組合成一個完整訊息,以及將粘包的資料按照某個規則拆分形成多個完整訊息。所以基於這個角度思考,我們只需要針對訊息做一個通訊雙方約定的識別規則即可。
訊息長度固定
每個資料報文都需要一個固定的長度,當接收方累計讀取到固定長度的報文後,就認為已經獲得了一個完整的訊息,當傳送方的資料小於固定長度時,則需要空位補齊.
如圖下圖所示,假設我們固定訊息長度是4,那麼沒有達到長度的報文,需要通過一個空位來補齊,從而使得訊息能夠形成一個整體。
這種方式很簡單,但是缺點也很明顯,對於沒有固定長度的訊息,不清楚如何設定長度,而且如果長度設定過大會造成位元組浪費,長度太小又會影響訊息傳輸,所以一般情況下不會採用這種方式。
特定分隔符
既然沒辦法通過固定長度來分割訊息,那能不能在訊息報文中增加一個分割符呢?然後接收方根據特定的分隔符來進行訊息拆分。比如我們採用\r\n來進行分割,如圖下圖所示。
對於特定分隔符的使用場景中,需要注意分隔符和訊息體中的字元不要存在衝突,否則會出現訊息拆分錯誤的問題。
訊息長度加訊息內容加分隔符
基於訊息長度+訊息內容+分隔符的方式進行資料通訊,如redis的報文協議定義如下。
*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\n
可以發現訊息報文包含三個維度
-
訊息長度
-
訊息分隔符
-
訊息內容
這種方式在專案中是非常常見的協議,首先通過訊息頭中的總長度來判斷當前一個完整訊息所攜帶的引數個數。然後在訊息體中,再通過訊息內容長度以及訊息體作為一個組合,最後通過\r\n進行分割。服務端收到這個訊息後,就可以按照該規則進行解析得到一個完整的命令進行執行。
Zookeeper中的訊息協議
在Zookeeper中使用了Jute協議,這是zookeeper自定義訊息協議,請求協議定義如圖下圖所示。
xid用於記錄客戶端請求發起的先後序號,用來確保單個客戶端請求的響應順序。type代表請求的操作型別,常見的包括建立節點、刪除節點和獲取節點資料等。協議的請求體部分是指請求的主體內容部分,包含了請求的所有操作內容。不同的請求型別,其請求體部分的結構是不同的。
響應協議定義如下圖所示。
協議的響應頭中的xid和上文中提到的請求頭中的xid是一致的,響應中只是將請求中的xid原值返回。zxid代表ZooKeeper伺服器上當前最新的事務ID。err則是一個錯誤碼,當請求處理過程中出現異常情況時,會在這個錯誤碼中標識出來。協議的響應體部分是指響應的主體內容部分,包含了響應的所有返回資料。不同的響應型別,其響應體部分的結構是不同的。
Netty中的編解碼器
在Netty中,預設幫我們提供了一些常用的編解碼器用來解決拆包粘包的問題。下面演示幾種解碼器的使用。
FixedLengthFrameDecoder解碼器
固定長度解碼器FixedLengthFrameDecoder的原理很簡單,就是通過構造方法設定一個固定訊息大小frameLength,無論接收方一次收到多大的資料,都會嚴格按照frameLength進行解碼。
如果累計讀取的長度大小為frameLength的訊息,那麼解碼器會認為已經獲取到了一個完整的訊息,如果訊息長度小於frameLength,那麼該解碼器會一直等待後續資料包的達到,直到獲得指定長度後返回。
使用方法如下,在Server端,增加一個FixedLengthFrameDecoder,長度為10。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
//指定epoll模型
.channel(NioServerSocketChannel.class)
//具體的工作處理類,負責處理相關SocketChannel的IO就緒事件
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new FixedLengthFrameDecoder(10))
.addLast(new SimpleServerHandler());
}
});
DelimiterBasedFrameDecoder解碼器
特殊分隔符解碼器: DelimiterBasedFrameDecoder,它有以下幾個屬性
-
delimiters,delimiters指定特殊分隔符,引數型別是ByteBuf,ByteBuf可以傳遞一個數組,意味著我們可以同時指定多個分隔符,但最終會選擇長度最短的分隔符進行拆分。
比如接收方收到的訊息體為
helloinworld\r\n
此時指定多個分隔符\n和\r\n,那麼最終會選擇最短的分隔符解碼,得到如下資料
hello | world |
-
maxLength,表示報文的最大長度限制,如果超過maxLength還沒檢測到指定分隔符,將會丟擲TooLongFrameException。
-
failFast,表示容錯機制,它與maxLength配合使用。如果failFast=true,當超過maxLength後會立刻丟擲TooLongFrameException,不再進行解碼。如果failFast=false,那麼會等到解碼出一個完整的訊息後才會丟擲TooLongFrameException
-
stripDelimiter,它的作用是判斷解碼後的訊息是否去除分隔符,如果stripDelimiter=false,而制定的特定分隔符是\n,那麼資料解碼的方式如下。
hello\nworld\r\n
當stripDelimiter=false時,解碼後得到
hello\n | world\r\n
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
//指定epoll模型
.channel(NioServerSocketChannel.class)
//具體的工作處理類,負責處理相關SocketChannel的IO就緒事件
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("&".getBytes());
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder
(10, true, true, delimiter))
.addLast(new SimpleServerHandler());
}
});
LengthFieldBasedFrameDecoder解碼器
LengthFieldBasedFrameDecoder是長度域解碼器,它是解決拆包粘包最常用的解碼器,基本上能覆蓋大部分基於長度拆包的場景。其中開源的訊息中介軟體RocketMQ就是使用該解碼器進行解碼的。
首先來說明一下該解碼器的核心引數
- lengthFieldOffset,長度欄位的偏移量,也就是存放長度資料的起始位置
- lengthFieldLength,長度欄位所佔用的位元組數
- lengthAdjustment,在一些較為複雜的協議設計中,長度域不僅僅包含訊息的長度,還包含其他資料比如版本號、資料型別、資料狀態等,這個時候我們可以使用lengthAdjustment進行修正,它的值=包體的長度值-長度域的值
- initialBytesToStrip,解碼後需要跳過的初始位元組數,也就是訊息內容欄位的起始位置
- lengthFieldEndOffset,長度欄位結束的偏移量,該屬性的值=lengthFieldOffset+lengthFieldLength
- failFast, 一般設定為true,當這個引數為true時,Netty一旦讀到Length欄位,並判斷Length超過maxFrameLength,就立即丟擲異常。
上面這些引數理解起來比較難,我們通過幾個案例來說明一下。
訊息長度+訊息內容的解碼
假設存在圖3-6所示的由長度和訊息內容組成的資料包,其中length表示報文長度,用16進製表示,共佔用2個位元組,那麼該協議對應的編解碼器引數設定如下。
- lengthFieldOffset=0,因為Length欄位就在報文的開始位置
- lengthFieldLength=2,協議設計的固定長度為2個位元組
- lengthAdjustment=0,Length欄位指包含訊息長度,不需要做修正
- initialBytesToStrip=0,解碼內容是Length+content,不需要跳過任何初始位元組。
截斷解碼結果
如果我們希望解碼後的結果中只包含訊息內容,其他部分不變,如下圖所示。對應解碼器引數組合如下
- lengthFieldOffset=0,因為Length欄位就在報文開始位置
- lengthFieldLength=2,協議設計的固定長度
- lengthAdjustment=0,Length欄位只包含訊息長度,不需要做任何修正
- initialBytesToStrip=2,跳過length欄位的位元組長度,解碼後ByteBuf只包含Content欄位。
長度欄位包含訊息內容
如圖3-8所示,如果Length欄位中包含Length欄位自身的長度以及Content欄位所佔用的位元組數,那麼Length的值為0x00d(2+11=13位元組),在這種情況下解碼器的引數組合如下
- lengthFieldOffset=0,因為Length欄位就在報文開始的位置
- lengthFieldLength=2,協議設計的固定長度
- lengthAdjustment= -2,長度欄位為13位元組,需要減2才是拆包所需要的長度。
- initialBytesToStrip=0,解碼後內容依然是Length+Content,不需要跳過任何初始位元組
基於長度欄位偏移的解碼
如圖3-9所示,Length欄位已經不再是報文的起始位置,Length欄位的值是0x000b,表示content欄位佔11個位元組,那麼此時解碼器的引數配置如下:
- lengthFieldOffset=2,需要跳過Header所佔用的2個位元組,才是Length的起始位置
- lengthFieldLength=2,協議設計的固定長度
- lengthAdjustment=0,Length欄位只包含訊息長度,不需要做任何修正
- initialBytesToStrip=0,解碼後內容依然是Length+Content,不需要跳過任何初始位元組
基於長度偏移和長度修正解碼
如下圖所示,Length欄位前後分別有hdr1和hdr2欄位,各佔據1個位元組,所以需要做長度欄位的偏移,還需要做lengthAdjustment的修正,相關引數配置如下。
- lengthFieldOffset=1,需要跳過hdr1所佔用的1個位元組,才是Length的起始位置
- lengthFieldLength=2,協議設計的固定長度
- lengthAdjustment=1,由於hdr2+content一共佔了1+11=12位元組,所以Length欄位值(11位元組)加上lengthAdjustment(1)才能得到hdr2+Content的內容(12位元組)
- initialBytesTostrip=3,解碼後跳過hdr1和length欄位,共3個位元組
解碼器實戰
比如我們定義如下訊息頭,客戶端通過該訊息協議傳送資料,服務端收到該訊息後需要進行解碼
先定義客戶端,其中Length部分,可以使用Netty自帶的LengthFieldPrepender來實現,它可以計算當前傳送訊息的二進位制位元組長度,然後把該長度新增到ByteBuf的緩衝區頭中。
//PacketNettyClient.class
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//表示傳輸訊息的時候,在訊息報文中增加4個位元組的length。->傳送的ByteBuf
.addLast(new LengthFieldPrepender(4,0,false))
.addLast(new StringEncoder())
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("i am first request");
ctx.writeAndFlush("i am second request");
}
});
}
});
//PackageNettyServer.class
ServerBootstrap serverBootstrp=new ServerBootstrap();
serverBootstrp.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(
new LengthFieldBasedFrameDecoder(
1024*1024,
0,2,
0,2))
.addLast(new StringDecoder())
.addLast(new SimpleServerHandler());
}
});
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("伺服器端收到訊息:"+msg);
}
}