記一次解決netty半包問題的經歷
最近學習了netty,想寫一個簡單的rpc,結果發現傳送訊息時遇到難題了,網上搜了一下,這種情況是半包問題和粘包問題,主要是出現在併發高一些的時候。
talk is cheap
客戶端編碼:
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { encode0(channelHandlerContext,o,byteBuf); } private voidencode0(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { if(o instanceof UserInfo){ byte[] data = Serializition.serialize((UserInfo) o,UserInfo.class); byteBuf.writeBytes(data); } }
服務端解碼:
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
myDecode(channelHandlerContext,byteBuf,list);
}
public void myDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
int len = byteBuf.readableBytes();
byte[] data = new byte[len];
byteBuf.readBytes(data);
UserInfo userInfo = Serializition.deSerialize(data,UserInfo.class);
list.add(userInfo);
}
這是最初版本的,一開始以為只要讀出來反序列化成物件就ok了,進行了簡單的測試發現沒問題,但客戶端傳送頻繁一些服務端就開始報錯:
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. io.netty.handler.codec.DecoderException: java.lang.RuntimeException: Reading from abyte array threw an IOException (should never happen).
分析一下發現對於來自同一個遠端連線來說,服務端只會分配一個bytebuf來接收訊息(這裡使用的是UnpooledDirectByteBuf),這個bytebuf容量是動態擴增的,如果當前的長度不夠用來儲存新的訊息就會自動擴充套件。當客戶端傳送不頻繁時,服務端有足夠的時間來做準備接收和處理訊息,不會出現問題。但客戶端頻繁傳送時就會出現問題了,如上,服務端的可讀的位元組超過了一個物件,讀取後下一個物件反序列化就會出現問題。
解決思路:
1.每次傳送定長的訊息,不夠就補全,服務端設定對應的長度(但這樣有問題:如果這樣做客戶端會發送很多無用資訊,浪費效能,而且不知道設定多大的長度合適)
2.使用netty自帶的編碼和解碼器,如使用/r/n標誌符解碼,這就要繼承MessageDecoder了,也就是字元解碼,即先將訊息在位元組--字串--物件將轉換(有點浪費效率,而且萬一內容中有對應的分隔符就會出問題)
3.每次傳送訊息前先獲取物件位元組陣列的長度(我最開始使用的方法,後來在網上也找到別人一樣的思路)
客戶端:
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { encode1(channelHandlerContext,o,byteBuf); } private void encode1(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { if(o instanceof UserInfo){ byte[] data = Serializition.serialize((UserInfo) o,UserInfo.class); byteBuf.writeInt(data.length); byteBuf.writeBytes(data); } }
服務端:
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { myDecode1(channelHandlerContext,byteBuf,list); } public void myDecode1(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){ if(byteBuf.readableBytes()>4){ int len = byteBuf.readInt(); byte[] data = new byte[len]; byteBuf.readBytes(data); UserInfo userInfo = Serializition.deSerialize(data,UserInfo.class); list.add(userInfo); } }
這就看起來簡單了 資料流是 |int|bytes|int|bytes,但實際情況還是發生了問題,還是出現了一樣的問題。異常原因是服務端例項化陣列長度後可讀位元組不夠,原因是傳送時客戶端是分包傳送的。
因此我在這個方法的基礎上增加了一個條件:如果可讀位元組數不夠就儲存已建立好的位元組陣列,等下一次位元組數夠時使用
private volatile int len=0; protected void decode5(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int length =len>0?len:(byteBuf.readableBytes()>=4?byteBuf.readInt():0); if(byteBuf.readableBytes()>=length&&length>0) { byte[] data = new byte[length]; byteBuf.readBytes(data); UserInfo userInfo = Serializition.deSerialize(data, UserInfo.class); list.add(userInfo); //bytes.put(length, data); len=0; }else { len = length; } }
經過測試,問題得到解決。