JBoss Marshalling 序列化技術 編解碼器使用 netty 例項
阿新 • • 發佈:2018-12-09
JBoss Marshalling 是一個 Java 物件序列化包,對 JDK 預設的序列化框架進行了優化,但又保持與 Serializable 介面的相容,同時增加了一些可呼叫的引數和附加的屬性,這些引數可通過工廠類進行配置。
環境準備
maven 依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency >
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.4.11.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId >
<version>1.4.11.Final</version>
</dependency>
專案結構
編碼與介紹
功能介紹 & 實現效果
// 1. 客戶端傳送5個Boy例項物件
五個藍孩子組隊去表白:
[name=同學 1, height=170, faceScore=91, isRich=false]
[name=同學 2, height=180, faceScore=89, isRich=true]
[name=同學 3, height=162, faceScore=93, isRich=false]
[name=同學 4 , height=169, faceScore=96, isRich=true]
[name=同學 5, height=172, faceScore=84, isRich=false]
// 2. 服務端收到依次收到每個Boy例項物件
收到表白:[name=同學 1, height=170, faceScore=91, isRich=false]
收到表白:[name=同學 2, height=180, faceScore=89, isRich=true]
收到表白:[name=同學 3, height=162, faceScore=93, isRich=false]
收到表白:[name=同學 4, height=169, faceScore=96, isRich=true]
收到表白:[name=同學 5, height=172, faceScore=84, isRich=false]
// 3. 服務端(女孩)判斷此boy有沒有高富帥屬性,給予回覆
收到女生回覆 :to 同學 1 : 中意你哦!
收到女生回覆 :to 同學 2 : 中意你哦!
收到女生回覆 :to 同學 3 : 中意你哦!
收到女生回覆 :to 同學 4 : 中意你哦!
收到女生回覆 :to 同學 5 : 騷年回家繼續努力吧!
服務端開發
// Server 服務端主程式
public class Server
{
public void bind(int port) throws InterruptedException
{
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
try
{
ServerBootstrap b=new ServerBootstrap();
b.group(boss,worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1280)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingDecoder()
);
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingEncoder()
);
ch.pipeline().addLast(
new ServerHandler()
);
}
});
ChannelFuture f=b.bind(port).sync();
f.channel().closeFuture().sync();
} finally
{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException
{
int port = 8005;
new Server().bind(port);
}
}
// ServerHandler 服務端Handler
@ChannelHandler.Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter
{
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception
{
Boy boy= (Boy) msg;
System.out.println("收到表白:"+boy);
ctx.writeAndFlush(reply(boy));
}
private static GirlResponse reply(Boy boy){
GirlResponse gr=new GirlResponse();
if(boy.getFaceScore()>=90||boy.getHeight()>=180||boy.isRich()){
gr.setMsg("to "+boy.getName()+" : 中意你哦!");
}else{
gr.setMsg("to "+boy.getName()+" : 騷年回家繼續努力吧!");
}
return gr;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception
{
cause.printStackTrace();
ctx.close();
}
}
客戶端開發
// 客戶端主程式
public class Client
{
public void connect(int port, String host) throws InterruptedException
{
EventLoopGroup group = new NioEventLoopGroup();
try
{
Bootstrap b=new Bootstrap();
b.group(group)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingDecoder()
);
ch.pipeline().addLast(
MarshallingCodeCFactory.buildMarshallingEncoder()
);
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f= b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally
{
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException
{
int port = 8005;
String host="127.0.0.1";
new Client().connect(port, host);
}
}
// 客戶端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter
{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
System.out.println("五個藍孩子組隊去表白:");
Random r = new Random();
for (int i = 1; i <= 5; i++)
{
Boy boy = new Boy();
boy.setName("同學 " + i);
boy.setFaceScore(r.nextInt(20) + 80);
boy.setHeight(r.nextInt(25) + 160);
boy.setRich(i % 2 == 0);
ctx.write(boy);
System.out.println(boy);
}
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception
{
GirlResponse gr= (GirlResponse) msg;
System.out.println("收到女生回覆 :"+gr.getMsg());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx , Throwable cause) throws Exception
{
cause.printStackTrace();
ctx.close();
}
}
Marshalling 編解碼器工廠類 & pojo
// Marshalling 編解碼器工廠
public final class MarshallingCodeCFactory
{
/**
* 建立Jboss Marshalling解碼器MarshallingDecoder
*
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder()
{
// 首先通過Marshalling工具類的精通方法獲取Marshalling例項物件 引數serial標識建立的是java序列化工廠物件。
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
// 建立了MarshallingConfiguration物件,配置了版本號為5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 根據marshallerFactory和configuration建立provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(
marshallerFactory , configuration);
// 構建Netty的MarshallingDecoder物件,倆個引數分別為provider和單個訊息序列化後的最大長度
return new MarshallingDecoder(provider ,
1024 * 1024 * 1);
}
/**
* 建立Jboss Marshalling編碼器MarshallingEncoder
*
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder()
{
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(
marshallerFactory , configuration);
// 構建Netty的MarshallingEncoder物件,MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
return new MarshallingEncoder(provider);
}
}
要進行序列化的類一定要實現 Seriaizable介面,並且新增serialVersionUID
// Boy.java
public class Boy implements Serializable
{
private static final long serialVersionUID = 3333140133888151024L;
/**
* 姓名
*/
private String name;
/**
* 身高
*/
private int height;
/**
* 顏值
*/
private int faceScore;
/**
* 是否富有
*/
private boolean isRich;
@Override
public String toString()
{
return "[name="+name+", height="+height+", faceScore="+faceScore+", isRich="+isRich+"]";
}
// GirlResponse.java
public class GirlResponse implements Serializable
{
private static final long serialVersionUID = -2619994978640439932L;
private int code;
private String msg;
粘包拆包思考
測試了下粘包拆包的場景,發現程式執行依舊正常,說明Marshalling 編解碼器本身是支援粘包拆包的處理。
究其原因,來看下原始碼:
// MarshallingEncoder 部分原始碼
@Sharable
public class MarshallingEncoder extends MessageToByteEncoder<Object> {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
。。。。
。。。。
。。。。
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
Marshaller marshaller = provider.getMarshaller(ctx);
int lengthPos = out.writerIndex();
out.writeBytes(LENGTH_PLACEHOLDER);
可以看到Marshalling編碼器在編碼時會在報文頭部加上4個位元組(LENGTH_PLACEHOLDER 用來記錄報文長度)。
// MarshallingDecoder 部分原始碼
public class MarshallingDecoder extends LengthFieldBasedFrameDecoder {
。。。
。。。
。。。
public MarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {
// 預設頭部長度為4位元組
super(maxObjectSize, 0, 4, 0, 4);
this.provider = provider;
}
從上面原始碼可以發現Marshalling 繼承自LengthFieldBaseFrameDecoder, 這是netty 提供的一個用於解決粘包拆包問題的解碼器,主要原理就是在報文頭部記錄報文長度。
遇到的坑
Marshalling 的jar 版本小於2
// 在版本大於2的情況下,以下獲取到的MarshallerFactory 為null
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");