1. 程式人生 > >基於Protostuff實現的Netty編解碼器

基於Protostuff實現的Netty編解碼器

發送數據 gets log har not check finally chm sch

在設計netty的編解碼器過程中,有許多組件可以選擇,這裏由於咱對Protostuff比較熟悉,所以就用這個組件了。由於數據要在網絡上傳輸,所以在發送方需要將類對象轉換成二進制,接收方接收到數據後,需要將二進制轉換成類對象,由於這個操作在之前的文章中有講解過:網絡傳輸數據序列化工具Protostuff,所以可以翻看我之前的文章來查看具體的實踐方法:

public class SerializeUtil {

    private static class SerializeData{
        private Object target;
    }

    @SuppressWarnings("unchecked"
) public static byte[] serialize(Object object) { SerializeData serializeData = new SerializeData(); serializeData.target = object; Class<SerializeData> serializeDataClass = (Class<SerializeData>) serializeData.getClass(); LinkedBuffer linkedBuffer = LinkedBuffer.allocate(1024 * 4); try
{ Schema<SerializeData> schema = RuntimeSchema.getSchema(serializeDataClass); return ProtostuffIOUtil.toByteArray(serializeData, schema, linkedBuffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally
{ linkedBuffer.clear(); } } @SuppressWarnings("unchecked") public static <T> T deserialize(byte[] data, Class<T> clazz) { try { Schema<SerializeData> schema = RuntimeSchema.getSchema(SerializeData.class); SerializeData serializeData = schema.newMessage(); ProtostuffIOUtil.mergeFrom(data, serializeData, schema); return (T) serializeData.target; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }

但是,上面只是普通的操作Util,如何讓數據能夠在netty上進行傳輸呢?

在netty中,如果想發送數據出去,那麽需要將數據轉換成二進制,然後通過網絡傳送出去,他提供了MessageToByteEncoder的操作類,用戶需要繼承此類,然後實現encode方法就可以了。來看看我們如何將我們寫好的SerializeUtil操作類集成進去:

public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception {
        out.writeBytes(SerializeUtil.serialize(msg));
    }
}

如上代碼所示,我們就準備好了一個基於Protostuff組件實現的編碼類了。編碼後的數據,被添加到ByteBuf緩沖區後,被發送出去。

那麽如何來實現解碼器呢?

public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder{

    public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }

    @Override
    public  Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        try {
            byte[] dstBytes = new byte[in.readableBytes()];
            //in.getBytes(in.readerIndex(), dstBytes);
            //切記這裏一定要用readBytes,不能用getBytes,否則會導致readIndex不能向後移動,從而導致netty did not read anything but decoded a message.錯誤
            in.readBytes(dstBytes,0,in.readableBytes());
            NettyMessage nettyMessage = SerializeUtil.deserialize(dstBytes, NettyMessage.class);
            return nettyMessage;
        } catch (Exception e) {
            System.out.println("exception when decoding: " + e);
            return null;
        }
    }
}

如上代碼所示。一般情況下,需要繼承netty中的ByteToMessageDecoder操作類來實現,但是考慮到這樣的話需要用戶自己來處理粘包拆包問題,比較麻煩,所以我們就繼承自netty中為我們準備好的LengthFieldBasedFrameDecoder來進行,由於此decoder具有處理粘包拆包的功能,而且其繼承自ByteToMessageDecoder類,所以就省去了我們處理粘包拆包的邏輯。

需要註意的是,在進行解碼的過程中,我們首先需要從緩沖區讀取數據到byte數組中,然後需要將readerIndex標記往後移動,如果讀完後不移動的話,會報netty did not read anything but decoded a message的錯誤,而且這個錯誤在你運行的時候並不會拋出來,非常隱蔽,要不是細細的調試客戶端,根本不能發覺此錯誤的存在。

所以從上面代碼可以看出,ByteBuf.getBytes,只是單純的讀取緩存區數據,並不會將readerIndex後移。但是ByteBuf.readBytes則會將readerIndex後移。這點必須重視。

最後,我們將這兩個實現類放到handler執行容器中即可。

   channel.pipeline().addLast("nettyMessageDecoder", new NettyMessageDecoder(1024 * 1024, 4, 4));
   channel.pipeline().addLast("nettyMessageEncoder", new NettyMessageEncoder());
   channel.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
   channel.pipeline().addLast("loginAuthResponseHandler", new LoginAuthResponseHandler());
   channel.pipeline().addLast("heartBeatHandler", new HeartBeatResponseHandler());

最後啟動服務,我們就可以看到我們的編解碼器正常跑起來了:

Login is ok: Netty Message [header=Header [crcCode=-1410399999,length=0,sessionId=0,type=4,priority=0,attachment={}]]
Client send heart beat message to server : ----> Netty Message [header=Header [crcCode=-1410399999,length=0,sessionId=1344,type=5,priority=0,attachment={}]]
Client receive server heartbeat message : ---> Netty Message [header=Header [crcCode=-1410399999,length=0,sessionId=0,type=6,priority=0,attachment={}]]

基於Protostuff實現的Netty編解碼器