netty自定義編碼器和解碼器(粘包處理)
阿新 • • 發佈:2018-12-09
這裡的實現方式是:將訊息分為兩部分,也就是訊息頭和訊息尾,訊息頭中寫入要傳送資料的總長度,通常是在訊息頭的第一個欄位使用int值來標識傳送資料的長度。 首先我們寫一個Encoder,我們繼承自MessageToByteEncoder ,把物件轉換成byte,繼承這個物件,會要求我們實現一個encode方法:
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
byte[] body = convertToBytes(msg); //將物件轉換為byte,虛擬碼,具體用什麼進行序列化,你們自行選擇。可以使用我上面說的一些
int dataLength = body.length; //讀取訊息的長度
out.writeInt(dataLength); //先將訊息長度寫入,也就是訊息頭
out.writeBytes(body); //訊息體中包含我們要傳送的資料
}
那麼當我們在Decode的時候,該怎麼處理髮送過來的資料呢?這裡我們繼承ByteToMessageDecoder方法,繼承這個物件,會要求我們實現一個decode方法
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < HEAD_LENGTH) { //這個HEAD_LENGTH是我們用於表示頭長度的位元組數。 由於上面我們傳的是一個int型別的值,所以這裡HEAD_LENGTH的值為4.
return;
}
in.markReaderIndex(); //我們標記一下當前的readIndex的位置
int dataLength = in.readInt(); // 讀取傳送過來的訊息的長度。ByteBuf 的readInt()方法會讓他的readIndex增加4
if (dataLength < 0 ) { // 我們讀到的訊息體長度為0,這是不應該出現的情況,這裡出現這情況,關閉連線。
ctx.close();
}
if (in.readableBytes() < dataLength) { //讀到的訊息體長度如果小於我們傳送過來的訊息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
byte[] body = new byte[dataLength]; // 嗯,這時候,我們讀到的長度,滿足我們的要求了,把傳送過來的資料,取出來吧~~
in.readBytes(body); //
Object o = convertToObject(body); //將byte資料轉化為我們需要的物件。虛擬碼,用什麼序列化,自行選擇
out.add(o);
}
下面來一個示例(例項只做了字串的處理,其他自定義物件的處理參考上面)。 服務端(接收端):
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 新增自定義的解碼器
socketChannel.pipeline().addLast(new MyCustomMessageDecoder());
socketChannel.pipeline().addLast(new ServerMessageHandler());
}
});
try {
ChannelFuture future = bootstrap.bind(9999).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
自定義的訊息解碼器:
public class MyCustomMessageDecoder extends ByteToMessageDecoder {
// 訊息頭:傳送端寫的是一個int,佔用4位元組。
private final static int HEAD_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//
if (in.readableBytes() < HEAD_LENGTH) {
return;
}
// 標記一下當前的readIndex的位置
in.markReaderIndex();
// 讀取資料長度
int dataLength = in.readInt();
// 我們讀到的訊息體長度為0,這是不應該出現的情況,這裡出現這情況,關閉連線。
if (dataLength < 0) {
ctx.close();
}
//讀到的訊息體長度如果小於我們傳送過來的訊息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
// 將緩衝區的資料讀到位元組陣列
byte[] body = new byte[dataLength];
in.readBytes(body);
//將byte資料轉化為我們需要的物件。虛擬碼,用什麼序列化,自行選擇
Object msg = convertToObj(body);
out.add(msg);
}
private Object convertToObj(byte[] body) {
return new String(body,0,body.length);
}
}
Server端的訊息處理器:
public class ServerMessageHandler extends ChannelInboundHandlerAdapter {
private int messageCount = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String _msg = (String) msg;
System.out.println("["+(++messageCount)+"]接收到訊息:" + _msg);
// 注意:業務異常需要處理,不能不管,否則會呼叫exceptionCaught()
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客戶端程式碼:
public class Client
{
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_SNDBUF,10)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 增加自定義編碼器
socketChannel.pipeline().addLast(new MyCustomMessageEncoder());
socketChannel.pipeline().addLast(new ClientMessageHandler());
}
});
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
自定義的訊息編碼器:
public class MyCustomMessageEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 要傳送的資料
// 這裡如果是自定義的型別,msg即為自定義的型別,需要轉為byte[]
byte[] body = ((ByteBuf)msg).array();
// 資料長度
int dataLength = body.length;
// 緩衝區先寫入資料長度
out.writeInt(dataLength);
// 再寫入資料
out.writeBytes(body);
}
}
客戶端的訊息處理器:
public class ClientMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String msg = "hello,world";
byte[] data;
ByteBuf buf;
for (int i=0;i<100;i++) {
data = (msg+i).getBytes();
buf = Unpooled.copiedBuffer(data);
ctx.writeAndFlush(buf);
}
System.out.println("100條 訊息傳送完畢");
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
執行效果 客戶端:
服務端: