基於Protostuff實現的Netty編解碼器
在設計netty的編解碼器過程中,有許多組件可以選擇,這裏由於咱對Protostuff比較熟悉,所以就用這個組件了。由於數據要在網絡上傳輸,所以在發送方需要將類對象轉換成二進制,接收方接收到數據後,需要將二進制轉換成類對象,由於這個操作在之前的文章中有講解過:網絡傳輸數據序列化工具Protostuff,所以可以翻看我之前的文章來查看具體的實踐方法:
public class SerializeUtil { private static class SerializeData{ private Object target; } @SuppressWarnings("unchecked") public static byte[] serialize(Object object) { SerializeData serializeData = new SerializeData(); serializeData.target = object; Class<SerializeData> serializeDataClass = (Class<SerializeData>) serializeData.getClass(); LinkedBuffer linkedBuffer = LinkedBuffer.allocate(1024 * 4); try{ Schema<SerializeData> schema = RuntimeSchema.getSchema(serializeDataClass); return ProtostuffIOUtil.toByteArray(serializeData, schema, linkedBuffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally{ linkedBuffer.clear(); } } @SuppressWarnings("unchecked") public static <T> T deserialize(byte[] data, Class<T> clazz) { try { Schema<SerializeData> schema = RuntimeSchema.getSchema(SerializeData.class); SerializeData serializeData = schema.newMessage(); ProtostuffIOUtil.mergeFrom(data, serializeData, schema); return (T) serializeData.target; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
但是,上面只是普通的操作Util,如何讓數據能夠在netty上進行傳輸呢?
在netty中,如果想發送數據出去,那麽需要將數據轉換成二進制,然後通過網絡傳送出去,他提供了MessageToByteEncoder的操作類,用戶需要繼承此類,然後實現encode方法就可以了。來看看我們如何將我們寫好的SerializeUtil操作類集成進去:
public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> { @Override protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception { out.writeBytes(SerializeUtil.serialize(msg)); } }
如上代碼所示,我們就準備好了一個基於Protostuff組件實現的編碼類了。編碼後的數據,被添加到ByteBuf緩沖區後,被發送出去。
那麽如何來實現解碼器呢?
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder{ public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { try { byte[] dstBytes = new byte[in.readableBytes()]; //in.getBytes(in.readerIndex(), dstBytes); //切記這裏一定要用readBytes,不能用getBytes,否則會導致readIndex不能向後移動,從而導致netty did not read anything but decoded a message.錯誤 in.readBytes(dstBytes,0,in.readableBytes()); NettyMessage nettyMessage = SerializeUtil.deserialize(dstBytes, NettyMessage.class); return nettyMessage; } catch (Exception e) { System.out.println("exception when decoding: " + e); return null; } } }
如上代碼所示。一般情況下,需要繼承netty中的ByteToMessageDecoder操作類來實現,但是考慮到這樣的話需要用戶自己來處理粘包拆包問題,比較麻煩,所以我們就繼承自netty中為我們準備好的LengthFieldBasedFrameDecoder來進行,由於此decoder具有處理粘包拆包的功能,而且其繼承自ByteToMessageDecoder類,所以就省去了我們處理粘包拆包的邏輯。
需要註意的是,在進行解碼的過程中,我們首先需要從緩沖區讀取數據到byte數組中,然後需要將readerIndex標記往後移動,如果讀完後不移動的話,會報netty did not read anything but decoded a message的錯誤,而且這個錯誤在你運行的時候並不會拋出來,非常隱蔽,要不是細細的調試客戶端,根本不能發覺此錯誤的存在。
所以從上面代碼可以看出,ByteBuf.getBytes,只是單純的讀取緩存區數據,並不會將readerIndex後移。但是ByteBuf.readBytes則會將readerIndex後移。這點必須重視。
最後,我們將這兩個實現類放到handler執行容器中即可。
channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4)); channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder()); channel.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); channel.pipeline().addLast("loginAuthResponseHandler", new LoginAuthResponseHandler()); channel.pipeline().addLast("heartBeatHandler", new HeartBeatResponseHandler());
最後啟動服務,我們就可以看到我們的編解碼器正常跑起來了:
Login is ok: Netty Message [header=Header [crcCode=-1410399999,length=0,sessionId=0,type=4,priority=0,attachment={}]]
Client send heart beat message to server : ----> Netty Message [header=Header [crcCode=-1410399999,length=0,sessionId=1344,type=5,priority=0,attachment={}]]
Client receive server heartbeat message : ---> Netty Message [header=Header [crcCode=-1410399999,length=0,sessionId=0,type=6,priority=0,attachment={}]]
基於Protostuff實現的Netty編解碼器