Netty(三)——編解碼之路
我們在開發中經常會把Java類進行implements Serializable用來網路傳輸的序列化和反序列化,過程其實就是將Java物件轉編碼為位元組陣列或者ByteBuffer物件進行傳輸,當遠端服務讀取到ByteBuffer物件或者位元組陣列時,需要將其解碼為Java物件。這也就是編解碼技術。
Java的序列化只是編解碼技術中的一種,但是由於其1,不支援跨語言;2,序列化後的碼流相對較大(Java.io.Serializable序列化相對於基於BuyeBuffer的二進位制編碼的碼流大小大概為5:1);3,效能相對來說較差(Java序列化只有二進位制編碼的大概6%)。業界出現了很多編解碼的框架,我們來看看我們選擇一款編解碼框架時需要考慮的東西:
1,是否支援跨語言,支援的語言種類是否豐富(現在跨系統、跨語言的互動太多了);
2,編碼後的碼流大小(碼流越大,儲存佔空間、網路傳輸更佔頻寬、吞吐量越低);
3,編解碼的效能;
4,類庫是否小巧,API使用是否方便;
5,使用者需要手工開發的工作量和難度。
好,下邊講述三種業界比較流行,而且Netty支援,通過Netty程式設計來實現的編解碼框架;
一,MessagePack編解碼:MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. But it's faster and smaller. Small integers are encoded into a single byte, and typical short strings require only one extra byte in addition to the strings themselves.
好,看下MessagePack編解碼的幾個重要點:1,利用MessagePack實現編碼和解碼的ChannelHandlerAdapter;2,server和client端,initChannel時將編解碼ChannelHandler新增到pipeline;3,支援粘包半包操作:LengthFieldBasedFrameDecoder、LengthFieldPrepender的加入。重點程式碼如下:
/** * @author liujiahan * @Title: MsgpackDecoder * @Copyright: Copyright (c) 2018 * @Description: 利用MessagePack解碼 MessageToMessageDecoder<I> extends ChannelHandlerAdapter * @Created on 2018/10/24 * @ModifiedBy: */ public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { final byte[] array; final int length = byteBuf.readableBytes(); array = new byte[length]; byteBuf.getBytes(byteBuf.readerIndex(),array,0,length); MessagePack messagePack = new MessagePack(); list.add(messagePack.read(array)); } } /** * @author liujiahan * @Title: MsgpackEncoder * @Copyright: Copyright (c) 2018 * @Description: 利用MessagePack編碼 MessageToByteEncoder<I> extends ChannelHandlerAdapter * @Created on 2018/10/24 * @ModifiedBy: */ public class MsgpackEncoder extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { MessagePack messagePack =new MessagePack(); //Serialize byte[] raw = messagePack.write(o); byteBuf.writeBytes(raw); } } /** * server和client端channel handler */ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //處理解決粘包半包 socketChannel.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2)); //解碼 socketChannel.pipeline().addLast("msgpack decoder",new MsgpackDecoder()); //處理解決粘包半包 socketChannel.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2)); //編碼 socketChannel.pipeline().addLast("msgpack encoder",new MsgpackEncoder()); socketChannel.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync();
二,Google Protobuf編解碼(https://developers.google.com/protocol-buffers/docs/javatutorial):它的優點a,在谷歌長期使用,產品成熟度高;b,跨語言、支援多種語言,包括C++、Java和Python等;c,編碼後的訊息更小,更加有利於儲存和傳輸;d,編解碼的效能非常高;e,支援不同協議版本的前向相容;f,支援定義可選和必選欄位。
首先,1,需要安裝protobuf;2,編寫proto資料檔案;3,通過protoc的命令(通過protoc -help看命令,這裡看下生成Java的: --java_out=OUT_DIR Generate Java source file.)進行生成對應語言的類 ;4,編寫程式碼。
這個例子看下全的程式碼:
1,首先是兩個request和response的proto資料檔案的定義,然後通過protoc生成對應的java實體,程式碼多不展示實體程式碼。
package netty;
option java_package = "com.ljh.netty.protobuf";
option java_outer_classname = "SubscribeReqProto";
message SubscribeReq{
required int32 subReqId = 1;
required string userName = 2;
required string productName = 3;
required string address = 4;
}
package netty;
option java_package = "com.ljh.netty.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
required int32 subReqId = 1;
required int32 subRespCode = 2;
required string desc = 3;
}
2,server和client端程式碼:
public class SubReqServer {
public void bind(int port){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b =new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance()));
socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
socketChannel.pipeline().addLast(new ProtobufEncoder());
socketChannel.pipeline().addLast(new SubReqServerHandler());
}
});
ChannelFuture f =b.bind(port).sync();
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port =8091;
if(args !=null && args.length >0){
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
}
}
new SubReqServer().bind(port);
}
}
public class SubReqServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
if ("ljh".equalsIgnoreCase(req.getUserName())) {
System.out.println("server accept client subscribe req : [" + req.toString() + "]");
ctx.writeAndFlush(resp(req.getSubReqId()));
}
}
private SubscribeRespProto.SubscribeResp resp(int subReqId) {
SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
builder.setSubReqId(subReqId);
builder.setSubRespCode(0);
builder.setDesc("netty book is very good");
return builder.build();
}
}
public class SubReqClient {
public void connect(int port,String host){
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//操,搞了我一個小時
socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance()));
socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
socketChannel.pipeline().addLast(new ProtobufEncoder());
socketChannel.pipeline().addLast(new SubReqClientHandle());
}
});
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port =8091;
if(args !=null && args.length >0){
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
}
}
new SubReqClient().connect(port,"127.0.0.1");
}
}
public class SubReqClientHandle extends ChannelHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0 ;i<10 ;i++){
ctx.write(subReq(1));
}
ctx.flush();
}
private SubscribeReqProto.SubscribeReq subReq(int i){
SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqId(i);
builder.setUserName("ljh");
builder.setProductName("netty-book");
builder.setAddress("beijing");
return builder.build();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("receive server response :【" + msg + "】");
}
}
三,Jboss Marshalling編解碼(http://jbossmarshalling.jboss.org/):也是一個Java物件序列包,對JDK預設的序列化框架進行了優化,又保持了跟Java.io.Serializable介面的相容,同時增加了一些可調的引數和附加的特性,這些引數和特性可通過工廠類進行配置。
編碼的重點看下吧,1,通過MarshallerFactory、MarshallingConfiguration建立UnmarshallerProvider例項,並建立MarshallingDecoder的channelHandler和MarshallingEncoder的channelHandler。2,server和client的initChannel時將編解碼ChannelHandler新增到pipeline。
好,下邊看下重點程式碼:
/**
* @author liujiahan
* @Title: MarshallingCodeFactory
* @Copyright: Copyright (c) 2018
* @Description: MarshallingCodeFactory 通過marshallerFactory 建立MarshallingDecoder decoder,MarshallingEncoder encoder
* @Created on 2018/10/26
* @ModifiedBy:
*/
public final class MarshallingCodeFactory {
/**
* 建立JBoss Marshalling解碼器MarshallingDecoder
*
* @return
*/
public static MarshallingDecoder buildMarshallingDecoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}
/**
* 建立 JBoss Marshalling編碼器MarshallingEncoder
*
* @return
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
//片段
ServerBootstrap b =new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//解碼
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder());
//編碼
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast(new SubReqServerHandler());
}
});
好,Netty支援很多成熟的高效的編解碼框架,用來解決網路程式設計的編解碼問題非常方便。
其實經過幾個例子後,就會發現、總結出Netty程式設計的套路,慢慢就能找到它的門道,接下來會繼續學習,相信也會理解更加深刻。