使用netty結合Protostuff傳輸物件例子
阿新 • • 發佈:2019-01-05
依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.14.Final</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-api</artifactId> <version>1.0.10</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.10</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.0.10</version> </dependency>
使用protostuff來序列化傳輸物件
客戶端 MyClient.java
package top.yuyufeng.object; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Date; /** * Created by yuyufeng on 2017/8/28. */ public class MyClient { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); ch.pipeline().addLast(new MyEncoder(MyRequest.class)); ch.pipeline().addLast(new MyDecoder(MyRequest.class)); ch.pipeline().addLast(new MyClientHandler()); } }); ChannelFuture future = b.connect(HOST, PORT).sync(); /*future.channel().writeAndFlush("Hello Netty Server ,I am a common client"); future.channel().closeFuture().sync();*/ MyRequest myRequest = new MyRequest(); myRequest.setRequestId(12345l); myRequest.setRequestMethod("doMethod123"); myRequest.setRequestTime(new Date()); future.channel().writeAndFlush(myRequest); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
MyClientHandler.java
package top.yuyufeng.object; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Created by yuyufeng on 2017/8/28. */ public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("MyClientHandler.channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("read Message:"+msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
轉碼器
MyDecoder.java
package top.yuyufeng.object;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* Created by yuyufeng on 2017/8/28.
*/
public class MyDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public MyDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = ProtostuffUtil.deserializer(data, genericClass);
out.add(obj);
}
}
MyEncoder.java
package top.yuyufeng.object;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Created by yuyufeng on 2017/8/28.
*/
public class MyEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public MyEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = ProtostuffUtil.serializer(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
傳輸物件MyRequest.java
package top.yuyufeng.object;
import java.util.Date;
/**
* Created by yuyufeng on 2017/8/28.
*/
public class MyRequest {
private Long requestId;
private String requestMethod;
private Date requestTime;
public Long getRequestId() {
return requestId;
}
public void setRequestId(Long requestId) {
this.requestId = requestId;
}
public String getRequestMethod() {
return requestMethod;
}
public void setRequestMethod(String requestMethod) {
this.requestMethod = requestMethod;
}
public Date getRequestTime() {
return requestTime;
}
public void setRequestTime(Date requestTime) {
this.requestTime = requestTime;
}
@Override
public String toString() {
return "MyRequest{" +
"requestId=" + requestId +
", requestMethod='" + requestMethod + '\'' +
", requestTime=" + requestTime +
'}';
}
}
服務端 MyServer.java
package top.yuyufeng.object;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
* Created by yuyufeng on 2017/8/28.
*/
public class MyServer {
private int port;
public MyServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
ch.pipeline().addLast(new MyDecoder(MyRequest.class));
ch.pipeline().addLast(new MyEncoder(MyRequest.class));
ch.pipeline().addLast(new MyServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 繫結埠,開始接收進來的連線
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new MyServer(port).start();
}
}
MyServerHandler.java
package top.yuyufeng.object;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
/**
* Created by yuyufeng on 2017/8/28.
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyRequest myRequest = (MyRequest) msg;
System.out.println(msg);
myRequest.setRequestTime(new Date());
ctx.writeAndFlush(myRequest).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Protostuff序列化工具ProtostuffUtil.java
package top.yuyufeng.object;
/**
* Created by yuyufeng on 2017/8/24.
*/
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ProtostuffUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static <T> Schema<T> getSchema(Class<T> clazz) {
@SuppressWarnings("unchecked")
Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);
if (schema == null) {
schema = RuntimeSchema.getSchema(clazz);
if (schema != null) {
cachedSchema.put(clazz, schema);
}
}
return schema;
}
/**
* 序列化
*
* @param obj
* @return
*/
public static <T> byte[] serializer(T obj) {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化
*
* @param data
* @param clazz
* @return
*/
public static <T> T deserializer(byte[] data, Class<T> clazz) {
try {
T obj = clazz.newInstance();
Schema<T> schema = getSchema(clazz);
ProtostuffIOUtil.mergeFrom(data, obj, schema);
return obj;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
執行除錯
server:
Server start listen at 8080
MyRequest{requestId=12345, requestMethod=’doMethod123’, requestTime=Mon Aug 28 15:02:04 CST 2017}
client:
MyClientHandler.channelActive
read Message:MyRequest{requestId=12345, requestMethod=’doMethod123’, requestTime=Mon Aug 28 15:22:41 CST 2017}