netty編解碼之java原生序列化
netty序列化之java原生序列化
前幾天有空看了下netty使用java原生序列化、以及使用protobuf、jboss marshalling進行編解碼,各種技術之間差異挺大,使用的方法各自不同,效能上原生的效能優勢確實不大,但是另外兩種確實很有優勢,覺得有點意思,於是準備寫一篇文章記載一下,但是一篇文章篇幅太長,因此拆成了三篇文章分別來講。
java序列化程式碼
這個簡單的程式就是server和client互相通訊,因此程式就是以下幾個類,比較簡單,本文采用netty4來進行程式設計。
model類
請求model類SubscribeReq:
package cn.com.serialize;
import java.io.Serializable;
/**
* Created by xiaxuan on 17/11/27.
*/
public class SubscribeReq implements Serializable {
private int subReqID;
private String userName;
private String productName;
private String phoneNumber;
private String address;
....
}
中間get、set方法省略,以下SubscribeResp也是如此。
響應model類SubscribeResp:
package cn.com.serialize;
import java.io.Serializable;
/**
* Created by xiaxuan on 17/11/27.
*/
public class SubscribeResp implements Serializable {
private int subReqID;
private int respCode;
private String desc;
....
}
server類SubReqServer和client類SubReqClient
server類SubReqServer
package cn.com.serialize;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* Created by xiaxuan on 17/11/27.
*/
public class SubReqServer {
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(
1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())
));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
//繫結埠,同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服務端監聽埠關閉
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//優雅退出,釋放執行緒組資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
new SubReqServer().bind(port);
}
}
client類SubReqClient
package cn.com.serialize;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* Created by xiaxuan on 17/11/27.
*/
public class SubReqClient {
public void connect(int port, String host) {
//配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(
1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())
));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
});
//發起非同步連線操作
ChannelFuture f = b.connect(host, port).sync();
//等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
new SubReqClient().connect(port, "127.0.0.1");
}
}
自己編寫java的序列化太過麻煩,但是netty中提供兩個類ObjectDecoder和ObjectEncoder來進行進行序列化。
ObjectDecoder有多種建構函式,支援不同的ClassResolver,在此使用了weakCachingConcurrentResolver建立執行緒安全的WeakReferenceMap對類載入器進行快取,並且當虛擬機器記憶體不足的時候,會釋放快取中的記憶體,防止記憶體洩漏,為了防止異常碼流和解碼錯位導致的記憶體溢位,這裡將單個物件最大序列化後的位元組陣列長度設定為1M,但是作為本例其實已經足夠了。
在server類和client類在啟動程式碼編寫完了後,都會有finally塊中將兩個EventLoopGroup優雅關閉的程式碼,這裡實際上是進行了hock的註冊,在程式直接關閉的時候並不是直接殺死程式,而是接收到kill事件之後一步步關閉程式,這個原理以後有空再單獨寫一篇文章講一下。
客戶端業務邏輯處理類SubReqClientHandler:
package cn.com.serialize;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created by xiaxuan on 17/11/27.
*/
public class SubReqClientHandler extends ChannelInboundHandlerAdapter {
public SubReqClientHandler() {}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.write(subReq(i));
}
ctx.flush();
}
private SubscribeReq subReq(int i) {
SubscribeReq req = new SubscribeReq();
req.setAddress("北京市");
req.setPhoneNumber("135xxxxxxxx");
req.setProductName("netty書籍");
req.setUserName("xiaxuan");
req.setSubReqID(i);
return req;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Receive server response : [" + msg + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客戶端鏈路被啟用時構造訂購請求訊息傳送,客戶端一次構造十條訂購請求,最後一次性發送給服務端,然後打出服務端的響應。
server業務邏輯處理類SubReqServerHandler:
package cn.com.serialize;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created by xiaxuan on 17/11/27.
*/
@ChannelHandler.Sharable
public class SubReqServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SubscribeReq req = (SubscribeReq) msg;
if ("xiaxuan".equalsIgnoreCase(req.getUserName())) {
System.out.println("Service accept client subscribe req : [" + req.toString() + "]");
ctx.writeAndFlush(resp(req.getSubReqID()));
}
}
private SubscribeResp resp(int subReqID) {
SubscribeResp resp = new SubscribeResp();
resp.setSubReqID(subReqID);
resp.setRespCode(0);
resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return resp;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服務端邏輯處理類,在收到客戶端請求後,構造一個簡單的應答。
執行結果
分別啟動server和Client,結果如下:
server結果:
client結果:
執行成功。
總體還是比較簡單,主要還是序列化和反序列化交給了netty的相應類來完成了,編寫雖然簡單,實際上速度並不是多麼快,所以在之後使用了protobuf和marshalling,這個之後再說。