1. 程式人生 > >netty5客戶端與伺服器端訊息傳遞

netty5客戶端與伺服器端訊息傳遞

1.普通字串訊息傳遞

伺服器端:

public void startServer(int port) throws InterruptedException{
EventLoopGroup work=new NioEventLoopGroup();
EventLoopGroup boss=new NioEventLoopGroup();
ServerBootstrap bootStrap=new ServerBootstrap();
bootStrap.group(boss, work);
bootStrap.channel(NioServerSocketChannel.class);
bootStrap.option(ChannelOption.SO_BACKLOG,1024);
bootStrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ServerHandler()

);
}
});
//繫結埠,同步等待成功.
ChannelFuture future=bootStrap.bind(port).sync();
//等待服務端監聽埠關閉
future.channel().closeFuture().sync();

boss.shutdownGracefully();
work.shutdownGracefully();

}

public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("msg:"+msg);
/*ByteBuf buf=(ByteBuf)msg;
byte[] req=new byte[buf.readableBytes()];
buf.readBytes(req);
String body=new String(req,"UTF-8");
System.out.println("body:"+body);
ctx.pipeline().writeAndFlush(buf);*///手動解析收到的資料
ctx.pipeline().writeAndFlush("hi......");

}

}

客戶端:

public void startClient(String host,int port) throws InterruptedException{
EventLoopGroup group=new NioEventLoopGroup();
Bootstrap bootStrap=new Bootstrap();
bootStrap.group(group);
bootStrap.channel(NioSocketChannel.class);
bootStrap.option(ChannelOption.SO_KEEPALIVE,true);
bootStrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline=sc.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ClientHandler()

);
}
});
//繫結埠,同步等待成功.
ChannelFuture future=bootStrap.connect(host,port).sync();
future.channel().writeAndFlush("hello");
//等待服務端監聽埠關閉
future.channel().closeFuture().sync();
group.shutdownGracefully();

}

public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("msg:"+msg);
}

}

2.物件型別訊息傳遞(傳遞物件直接實現Serializable)

伺服器端(與普通字串不同的部分標紅):

public void startServer(int port) throws InterruptedException{
EventLoopGroup work=new NioEventLoopGroup();
EventLoopGroup boss=new NioEventLoopGroup();
ServerBootstrap bootStrap=new ServerBootstrap();
bootStrap.group(boss, work);
bootStrap.channel(NioServerSocketChannel.class);
bootStrap.option(ChannelOption.SO_BACKLOG,1024);
bootStrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(new ObjectEncoder());//將String型別轉化為ByteBuf型別,解析傳送出去的
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));//將收到的ByteBuf轉化為String型別,解析收到的

pipeline.addLast(new ServerHandler());
}
});
//繫結埠,同步等待成功.
ChannelFuture future=bootStrap.bind(port).sync();
//等待服務端監聽埠關閉
future.channel().closeFuture().sync();
boss.shutdownGracefully();
work.shutdownGracefully();

}

ServerHandler處理器程式碼一樣。

客戶端:

public void startClient(String host,int port) throws InterruptedException{
EventLoopGroup group=new NioEventLoopGroup();
Bootstrap bootStrap=new Bootstrap();
bootStrap.group(group);
bootStrap.channel(NioSocketChannel.class);
bootStrap.option(ChannelOption.SO_KEEPALIVE,true);
bootStrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline=sc.pipeline();
pipeline.addLast(new ObjectEncoder());//將String型別轉化為ByteBuf型別,解析傳送出去的
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));//將收到的ByteBuf轉化為String型別,解析收到的

pipeline.addLast(new ClientHandler());
}
});
//繫結埠,同步等待成功.
ChannelFuture future=bootStrap.connect(host,port).sync();
Student student=new Student("user","psw");
future.channel().writeAndFlush(student);

//等待服務端監聽埠關閉
future.channel().closeFuture().sync();
group.shutdownGracefully();

}

ClientHandler處理器程式碼一樣。

物件Student(必須實現Serializable介面):

public class Student implements Serializable{
private String name;
private String psw;
public Student(String name,String psw){
this.name=name;
this.psw=psw;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPsw() {
return psw;
}
public void setPsw(String psw) {
this.psw = psw;
}
@Override
public String toString() {
return "[name:"+name+";password:"+psw+"]";
}

}

2.物件型別訊息傳遞(protostuff實現序列化)

服務端:

bootStrap.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline=ch.pipeline();
pipeline.addLast(new MyDecoder(Student.class));
pipeline.addLast(new MyEncoder(Student.class));

pipeline.addLast(new ServerHandler());

}

客戶端:

bootStrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline=sc.pipeline();
pipeline.addLast(new MyDecoder(Student.class));
pipeline.addLast(new MyEncoder(Student.class));

pipeline.addLast(new ClientHandler());

傳遞物件:

public class Student{
private String name;

private String psw;

解碼器:

public class MyDecoder extends ByteToMessageDecoder{
    private Class<?> genericClass;
    public MyDecoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }
    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        int dataLength = buf.readInt();//這邊讀取的第一個是長度欄位
        byte[] data = new byte[dataLength];
        buf.readBytes(data);//將in中的資料寫入data
        Object obj = ProtoStuffUtil.deserialize(data, genericClass);
        out.add(obj);
    }

}

編碼器:

public class MyEncoder extends MessageToByteEncoder{
    private Class<?> genericClass;
    public MyEncoder(Class<?> genericClass) {
        this.genericClass = genericClass;
    }
    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = ProtoStuffUtil.serialize(in);
            out.writeInt(data.length);//這邊設定的第一個是長度欄位
            out.writeBytes(data);
        }
    }

}

序列化工具類:

public class ProtoStuffUtil {
    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
    //物件轉位元組陣列序列化
    public static <T> byte[] serialize(T obj) {
        Class<T> cls = (Class<T>) obj.getClass();//獲取物件的類
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);//使用LinkedBuffer分配一塊預設大小的buffer空間
        try {
            Schema<T> schema = getSchema(cls);//通過物件的類構建物件的schema
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//使用給定的schema將物件序列化為一個byte陣列,並返回
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }
    //位元組陣列轉物件反序列化
    public static <T> T deserialize(byte[] data, Class<T> cls) {
        try {
            T message = cls.newInstance();
            Schema<T> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, message, schema);//使用給定的schema將byte陣列和物件合併
            return message;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
    private static <T> Schema<T> getSchema(Class<T> cls) {
        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(cls);
            if (schema != null) {
                cachedSchema.put(cls, schema);
            }
        }
        return schema;
    }
}