java架構之路-(netty專題)netty的編解碼(出入戰)與粘包拆包
上次迴歸:
上次部落格我們主要說了netty的基本使用,都是一些固定的模式去寫的,我們只需要關注我們的攔截器怎麼去寫就可以了,然後我們用我們的基礎示例,改造了一個簡單的聊天室程式,可以看到內部加了一個StringEncoder和StringDecoder,這個就是用來編解碼我們字串的,這次我們就來說說這個編解碼。
編碼&解碼:
上次我們寫的那個簡單的聊天室程式大家還記得吧,內部加了兩個類似攔截器的玩意。
ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new StringDecoder());
一個是編碼的,一個是解碼的,也是藉著這個東西,來個大家說一下我們的ChannelPipline,上次只是簡單了說了一下我們的ChannelPipline內部存放了ChannelHandler,而且是雙向連結串列的結構來儲存的,我們這次來細化一下我們這個攔截器是怎麼工作的。
內部大概是這個樣子的,每次放入的時候有順序的放置,暫時先說有順序,後面我會詳細解釋這個什麼時候順序生效,記住是由由頭開始放置,這裡涉及到兩個概念就是入站和出站。
就是說我們從客戶端傳送資料到服務端,叫做出站,會經過一系列的ChannelOutboundHandler,可以方便記憶為一系列的出站攔截器,我們想出站,就要經過出站攔截器。
反之我們的入站就是和出站相對應的,是由服務端傳送過來的資料,經由我們的一系列ChannelInboundHandler,到達我們的客戶端,其實出站入站你站在客戶端的角度來看就很好理解了,我們客戶端想發出去資料,就是出站,想進來資料(接收資料),就是入站,出站會經過out攔截器,入站會經過in攔截器。切記,是雙向的,客戶端和服務端不是共有一個ChannelPipline,而且這個出站和入站都是相對的,可能還是有一點抽象,我們來拿著我們聊天室的例子來看一下。
我們需要先明確我們的StringEncoder和StringDecoder是ChannelInboundHandler還是ChannelOutboundHandler。
StringEncoder是ChannelOutboundHandler,StringDecoder是ChannelInboundHandler,這回我們按照我們的程式碼畫一下圖。先弄一個客戶端傳送訊息的。
簡單解釋一下,我們的服務端和客戶端都有自己的ChannelPipline,我們的客戶端要傳送訊息,相當於客戶端是出站操作,我們要傳送,資料外流,顯然是資料要出去,出站操作啊, 出站要經過Encoder然後是我們自己的Clienthandler,走你,進入網路傳輸,對於我們的服務端來說,要接收資料,資料要進來,一定是入站操作啊,經過我們的Decoder,然後經過我們自己的Serverhandler,到達我們的服務端。
客戶端往外發送訊息,客戶端是出站操作,經過Encoder,然後經過我們ServerHandler,進入網路,我們客戶端是入站操作,經過Decoder,經過我們的ClientHandler,到達我們的服務端。這樣說應該就理解了吧,你可以自己在decode和encode方法上打斷點,自己除錯一下,後面我會說原始碼,自己也是先熟悉一下原始碼。出站一定是從尾到頭,入站一定是從頭到尾,別問我為什麼,我自己寫了測試類,測試一下午了.....
我們可以看到Handler是按照順序執行的,這個順序只是對於相同型別的Handlery有效果的,像我們的Decoder和Encoder,一個是入站的Handler,一個是出站的Handler,他倆誰在前,誰在後無所謂的。
粘包&拆包:
粘包和拆包比上面那個出入戰好理解很多很多,我們先來看一段程式碼。
package com.xiaocai.packing; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; public class NettyClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup();// 開啟工作執行緒組 try { Bootstrap bootstrap = new Bootstrap(); //建立一個和服務端相對應的server bootstrap.group(group) //設定執行緒組 .channel(NioSocketChannel.class) //使用NioSocketChannel作為客戶端的通道實現 .handler(new ChannelInitializer<SocketChannel>() {//設定回撥函式 @Override protected void initChannel(SocketChannel ch) { } }); ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();//啟動客戶端去連線伺服器端 //對通道關閉進行監聽 System.out.println("netty client start。。準備開始傳送資料"); for (int i = 0; i < 2000; i++) { ByteBuf buf = Unpooled.copiedBuffer("hello,xiaocaiJAVA!".getBytes(CharsetUtil.UTF_8)); cf.channel().writeAndFlush(buf); } System.out.println("傳送資料完畢"); cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully();//關閉執行緒組 } } }
就是什麼意思呢?我們建立一個客戶端連線,然後我們多次向我們的服務端傳送訊息,理論上我們每次收到的訊息都應該是hello,xiaocaiJAVA!,我們來看一下結論。
我們可以看到,有部分是正常的,有一部分是hello,xiaocaiJAVA!hello,xiaocaiJAVA!有的還是o,什麼什麼,這個明顯錯誤的,也就是我們的粘包拆包,為什麼會出現這個呢?netty收到我們的訊息不是馬上傳送出去,大概會等待一個瞬間,然後再發送我們的訊息,在等待的瞬間再次進來的訊息,他會一次性的傳送出去,但是netty自身並不知道我們的訊息該從何位置截斷,所以就出現了我們看到的粘包拆包問題,我們來看一下解決方法。
我們每次可以把資料發過去,而且把資料的長度帶過去就OK了,然後客戶端每次優先判斷一下資料的長度就可以了,看一下我這的解決方案。
package com.xiaocai.packing; /** * 自定義協議包 */ public class MyMessageProtocol { //定義一次傳送包體長度 private int len; //一次傳送包體內容 private byte[] content; public int getLen() { return len; } public void setLen(int len) { this.len = len; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } }
package com.xiaocai.packing; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> { @Override protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception { out.writeInt(msg.getLen()); out.writeBytes(msg.getContent()); } }
package com.xiaocai.packing; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MyMessageDecoder extends ByteToMessageDecoder { int length = 0; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if(in.readableBytes() >= 4) { if (length == 0){ length = in.readInt(); } if (in.readableBytes() < length) { System.out.println("當前可讀資料不夠,繼續等待。。"); return; } byte[] content = new byte[length]; if (in.readableBytes() >= length){ in.readBytes(content); //封裝成MyMessageProtocol物件,傳遞到下一個handler業務處理 MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(length); messageProtocol.setContent(content); out.add(messageProtocol); } length = 0; } } }
package com.xiaocai.packing; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class PackingServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> { private int count; @Override protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception { System.out.println("====服務端接收到訊息如下===="); System.out.println("長度=" + msg.getLen()); System.out.println("內容=" + new String(msg.getContent(), CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
package com.xiaocai.packing; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; public class PackingClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new MyMessageEncoder()); } }); ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync(); for(int i = 0; i< 200; i++) { String msg = "你好,小菜JAVA!"; //建立協議包物件 MyMessageProtocol messageProtocol = new MyMessageProtocol(); messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length); messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8)); cf.channel().writeAndFlush(messageProtocol); } cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
簡單來說就是我們封裝一個協議包,包含我們的內容和長度,我們要客戶端傳訊息之前,資訊進行處理,給予長度,然後我客戶端接收資料時優先判斷長度,長度不夠繼續等待,這樣就解決了我們的拆包粘包問題,有的人還會提出用什麼特殊符號的方法,也是可行的,但是你的資料中一定不要包含那個特殊符號,而且每次來一個新的開發人員,都要了解你們的特殊符號協議,還是比較麻煩的。
總結:
這次我們主要說了ChannelPipline內部的結構和addList時的放置順序,netty的入戰出戰,是相對的,出站走out攔截器,入站走in攔截器,入站一定是從頭到尾的,出站一定是從尾到頭的,切記~!!!
最進弄了一個公眾號,小菜技術,歡迎大家的加入
&n