netty使用msgpack自定義編解碼器實現序列化操作
阿新 • • 發佈:2018-11-10
匯入依賴
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
客戶端
/** * 類說明:客戶端 */ public class ClientMsgPackEcho { private final String host; public ClientMsgPackEcho(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*執行緒組*/ try { final Bootstrap b = new Bootstrap(); ;/*客戶端啟動必須*/ b.group(group)/*將執行緒組傳入*/ .channel(NioSocketChannel.class)/*指定使用NIO進行網路傳輸*/ /*配置要連線伺服器的ip地址和埠*/ .remoteAddress( new InetSocketAddress(host, ServerMsgPackEcho.PORT)) .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("已連線到伺服器....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { //這裡設定報文的包頭長度來避免粘包,設定為最大2個位元組長度,即是最大長度65536 ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); //對傳送的資料進行序列化 ch.pipeline().addLast(new MsgPackEncoder()); //處理伺服器的應答,因為伺服器傳來的是一個字串,所以可以使用LineBasedFrameDecoder ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ch.pipeline().addLast(new MsgPackClientHandler(5)); } } public static void main(String[] args) throws InterruptedException { new ClientMsgPackEcho("127.0.0.1").start(); } }
自定義序列化處理器
/** * 類說明:服務端反序列化處理器 */ public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { int length = msg.readableBytes(); byte[] array = new byte[length]; msg.getBytes(msg.readerIndex(), array, 0, length); MessagePack messagePack = new MessagePack(); //反序列化後將資料給下一個處理器處理 out.add(messagePack.read(array, User.class)); } }
客戶端處理器
/** * 類說明:客戶端處理器 */ public class MsgPackClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private final int sendNumber; public MsgPackClientHandler(int sendNumber) { this.sendNumber = sendNumber; } private AtomicInteger counter = new AtomicInteger(0); /*** 客戶端讀取到網路資料後的處理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); } /*** 客戶端被通知channel活躍後,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { User[] users = makeUsers(); //傳送資料 for(User user:users){ System.out.println("Send user:"+user); ctx.write(user); } ctx.flush(); } /*** 發生異常後的處理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /*生成使用者實體類的陣列,以供傳送*/ private User[] makeUsers(){ User[] users=new User[sendNumber]; User user =null; for(int i=0;i<sendNumber;i++){ user=new User(); user.setAge(i); String userName = "ABCDEFG --->"+i; user.setUserName(userName); user.setId("No:"+(sendNumber-i)); user.setUserContact( new UserContact(userName+"@xiangxue.com","133")); users[i]=user; } return users; } }
服務端
/**
* 類說明:服務端
*/
public class ServerMsgPackEcho {
public static final int PORT = 9995;
public static void main(String[] args) throws InterruptedException {
ServerMsgPackEcho serverMsgPackEcho = new ServerMsgPackEcho();
System.out.println("伺服器即將啟動");
serverMsgPackEcho.start();
}
public void start() throws InterruptedException {
final MsgPackServerHandler serverHandler = new MsgPackServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*執行緒組*/
try {
ServerBootstrap b = new ServerBootstrap();/*服務端啟動必須*/
b.group(group)/*將執行緒組傳入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO進行網路傳輸*/
.localAddress(new InetSocketAddress(PORT))/*指定伺服器監聽埠*/
/*服務端每接收到一個連線請求,就會新啟一個socket通訊,也就是channel,
所以下面這段程式碼的作用就是為這個子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*非同步繫結到伺服器,sync()會阻塞直到完成*/
System.out.println("伺服器啟動完成,等待客戶端的連線和資料.....");
f.channel().closeFuture().sync();/*阻塞直到伺服器的channel關閉*/
} finally {
group.shutdownGracefully().sync();/*優雅關閉執行緒組*/
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
//根據訊息長度,從中剝離出完整的實際資料
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,
0, 2, 0, 2));
//反序列化
ch.pipeline().addLast(new MsgPackDecoder());
//將反序列化後的實體類交給業務處理
ch.pipeline().addLast(new MsgPackServerHandler());
}
}
}
自定義反序列化處理器
/**
* 類說明:客戶端序列化處理器
*/
public class MsgPackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
throws Exception {
MessagePack messagePack = new MessagePack();
byte[] raw = messagePack.write(msg);
out.writeBytes(raw);
}
}
服務端處理器
/**
* 類說明:自己的業務處理
*/
@ChannelHandler.Sharable
public class MsgPackServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
/*** 服務端讀取到網路資料後的處理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//將上一個handler生成的資料強制轉型
User user = (User)msg;
System.out.println("Server Accept["+user
+"] and the counter is:"+counter.incrementAndGet());
//伺服器的應答
String resp = "I process user :"+user.getUserName()
+ System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 發生異常後的處理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}