1. 程式人生 > >Netty(三)——編解碼之路

Netty(三)——編解碼之路

       我們在開發中經常會把Java類進行implements Serializable用來網路傳輸的序列化和反序列化,過程其實就是將Java物件轉編碼為位元組陣列或者ByteBuffer物件進行傳輸,當遠端服務讀取到ByteBuffer物件或者位元組陣列時,需要將其解碼為Java物件。這也就是編解碼技術。

       Java的序列化只是編解碼技術中的一種,但是由於其1,不支援跨語言;2,序列化後的碼流相對較大(Java.io.Serializable序列化相對於基於BuyeBuffer的二進位制編碼的碼流大小大概為5:1);3,效能相對來說較差(Java序列化只有二進位制編碼的大概6%)。業界出現了很多編解碼的框架,我們來看看我們選擇一款編解碼框架時需要考慮的東西:

       1,是否支援跨語言,支援的語言種類是否豐富(現在跨系統、跨語言的互動太多了);

       2,編碼後的碼流大小(碼流越大,儲存佔空間、網路傳輸更佔頻寬、吞吐量越低);

       3,編解碼的效能;

       4,類庫是否小巧,API使用是否方便;

       5,使用者需要手工開發的工作量和難度。

       好,下邊講述三種業界比較流行,而且Netty支援,通過Netty程式設計來實現的編解碼框架;

       一,MessagePack編解碼:MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. But it's faster and smaller. Small integers are encoded into a single byte, and typical short strings require only one extra byte in addition to the strings themselves.

(官方解釋)。它具有a,編解碼高效、效能高;b,序列化之後碼流小;c,支援跨語言(Java、Python、Ruby、C#、Lua、Go、C、C++……)

         好,看下MessagePack編解碼的幾個重要點:1,利用MessagePack實現編碼和解碼的ChannelHandlerAdapter;2,server和client端,initChannel時將編解碼ChannelHandler新增到pipeline;3,支援粘包半包操作:LengthFieldBasedFrameDecoder、LengthFieldPrepender的加入。重點程式碼如下:

/**
 * @author liujiahan
 * @Title: MsgpackDecoder
 * @Copyright: Copyright (c) 2018
 * @Description: 利用MessagePack解碼 MessageToMessageDecoder<I> extends ChannelHandlerAdapter
 * @Created on 2018/10/24
 * @ModifiedBy:
 */
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        final byte[] array;
        final int length = byteBuf.readableBytes();
        array = new byte[length];
        byteBuf.getBytes(byteBuf.readerIndex(),array,0,length);
        MessagePack messagePack = new MessagePack();
        list.add(messagePack.read(array));
    }
}



/**
 * @author liujiahan
 * @Title: MsgpackEncoder
 * @Copyright: Copyright (c) 2018
 * @Description: 利用MessagePack編碼 MessageToByteEncoder<I> extends ChannelHandlerAdapter
 * @Created on 2018/10/24
 * @ModifiedBy:
 */
public class MsgpackEncoder extends MessageToByteEncoder<Object> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        MessagePack messagePack =new MessagePack();
        //Serialize
        byte[] raw = messagePack.write(o);
        byteBuf.writeBytes(raw);

    }
}


/**
*  server和client端channel handler
*/
Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //處理解決粘包半包
                    socketChannel.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2));
                    //解碼
                    socketChannel.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
                    //處理解決粘包半包
                    socketChannel.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
                    //編碼
                    socketChannel.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
                    socketChannel.pipeline().addLast(new EchoClientHandler());
                }
            });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();

       二,Google Protobuf編解碼(https://developers.google.com/protocol-buffers/docs/javatutorial):它的優點a,在谷歌長期使用,產品成熟度高;b,跨語言、支援多種語言,包括C++、Java和Python等;c,編碼後的訊息更小,更加有利於儲存和傳輸;d,編解碼的效能非常高;e,支援不同協議版本的前向相容;f,支援定義可選和必選欄位。

       首先,1,需要安裝protobuf;2,編寫proto資料檔案;3,通過protoc的命令(通過protoc -help看命令,這裡看下生成Java的:  --java_out=OUT_DIR          Generate Java source file.)進行生成對應語言的類 ;4,編寫程式碼。

      這個例子看下全的程式碼:

      1,首先是兩個request和response的proto資料檔案的定義,然後通過protoc生成對應的java實體,程式碼多不展示實體程式碼。

package netty;
option java_package = "com.ljh.netty.protobuf";
option java_outer_classname = "SubscribeReqProto";

message SubscribeReq{
    required int32 subReqId = 1;
    required string userName = 2;
    required string productName = 3;
    required string address = 4;

}



package netty;
option java_package = "com.ljh.netty.protobuf";
option java_outer_classname = "SubscribeRespProto";

message SubscribeResp{
    required int32 subReqId = 1;
    required int32 subRespCode = 2;
    required string desc = 3;
}

    2,server和client端程式碼:

public class SubReqServer {

    public void bind(int port){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b =new ServerBootstrap();
            b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                    socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    socketChannel.pipeline().addLast(new ProtobufEncoder());
                    socketChannel.pipeline().addLast(new SubReqServerHandler());
                }
            });
            ChannelFuture f =b.bind(port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port =8091;
        if(args !=null && args.length >0){
            try {
                port = Integer.valueOf(args[0]);
            }catch (NumberFormatException e){

            }
        }

        new SubReqServer().bind(port);
    }
}




public class SubReqServerHandler extends ChannelHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
        if ("ljh".equalsIgnoreCase(req.getUserName())) {
            System.out.println("server accept client subscribe req : [" + req.toString() + "]");
            ctx.writeAndFlush(resp(req.getSubReqId()));
        }
    }

    private SubscribeRespProto.SubscribeResp resp(int subReqId) {
        SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqId(subReqId);
        builder.setSubRespCode(0);
        builder.setDesc("netty book is very good");
        return builder.build();

    }
}



public class SubReqClient {
    public void connect(int port,String host){
        EventLoopGroup group = new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //操,搞了我一個小時
                    socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                    socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                    socketChannel.pipeline().addLast(new ProtobufEncoder());
                    socketChannel.pipeline().addLast(new SubReqClientHandle());
                }
            });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port =8091;
        if(args !=null && args.length >0){
            try {
                port = Integer.valueOf(args[0]);
            }catch (NumberFormatException e){

            }
        }

        new SubReqClient().connect(port,"127.0.0.1");
    }
}



public class SubReqClientHandle extends ChannelHandlerAdapter {

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i = 0 ;i<10 ;i++){
            ctx.write(subReq(1));
        }
        ctx.flush();
    }

    private SubscribeReqProto.SubscribeReq subReq(int i){
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqId(i);
        builder.setUserName("ljh");
        builder.setProductName("netty-book");
        builder.setAddress("beijing");
        return builder.build();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("receive server response :【" + msg + "】");
    }
}

       三,Jboss Marshalling編解碼(http://jbossmarshalling.jboss.org/):也是一個Java物件序列包,對JDK預設的序列化框架進行了優化,又保持了跟Java.io.Serializable介面的相容,同時增加了一些可調的引數和附加的特性,這些引數和特性可通過工廠類進行配置。

       編碼的重點看下吧,1,通過MarshallerFactory、MarshallingConfiguration建立UnmarshallerProvider例項,並建立MarshallingDecoder的channelHandler和MarshallingEncoder的channelHandler。2,server和client的initChannel時將編解碼ChannelHandler新增到pipeline。

      好,下邊看下重點程式碼:

/**
 * @author liujiahan
 * @Title: MarshallingCodeFactory
 * @Copyright: Copyright (c) 2018
 * @Description:  MarshallingCodeFactory 通過marshallerFactory 建立MarshallingDecoder decoder,MarshallingEncoder encoder
 * @Created on 2018/10/26
 * @ModifiedBy:
 */
public final class MarshallingCodeFactory {

    /**
     * 建立JBoss Marshalling解碼器MarshallingDecoder
     *
     * @return
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 建立 JBoss Marshalling編碼器MarshallingEncoder
     *
     * @return
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}



//片段
ServerBootstrap b =new ServerBootstrap();
            b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //解碼
                    socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder());
                    //編碼
                    socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder());
                    socketChannel.pipeline().addLast(new SubReqServerHandler());
                }
            });

       好,Netty支援很多成熟的高效的編解碼框架,用來解決網路程式設計的編解碼問題非常方便。

       其實經過幾個例子後,就會發現、總結出Netty程式設計的套路,慢慢就能找到它的門道,接下來會繼續學習,相信也會理解更加深刻。