一起學Netty(七)之 TCP粘包拆包基本解決方案
上個小節我們淺析了在Netty的使用的時候TCP的粘包和拆包的現象,Netty對此問題提供了相對比較豐富的解決方案
Netty提供了幾個常用的解碼器,幫助我們解決這些問題,其實上述的粘包和拆包的問題,歸根結底的解決方案就是傳送端給遠端端一個標記,告訴遠端端,每個資訊的結束標誌是什麼,這樣,遠端端獲取到資料後,根據跟傳送端約束的標誌,將接收的資訊分切或者合併成我們需要的資訊,這樣我們就可以獲取到正確的資訊了
例如,我們剛才的例子中,我們可以在傳送的資訊中,加一個結束標誌,例如兩個遠端端規定以行來切分資料,那麼傳送端,就需要在每個資訊體的末尾加上行結束的標誌,部分程式碼如下:
修改BaseClientHandler的req的構造:
我們在我們巨長的req中末尾加了System.getProperty("line.separator"),這樣相當於給req打了一個標記public BaseClientHandler() { // req = ("BazingaLyncc is learner").getBytes(); req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. His book w" + "ill give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the process " + "of configuring and connecting all of Netty’s components to bring your learned about threading models in ge" + "neral and Netty’s threading model in particular, whose performance and consistency advantages we discuss" + "ed in detail In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. Hi" + "s book will give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the" + " process of configuring and connecting all of Netty’s components to bring your learned about threading " + "models in general and Netty’s threading model in particular, whose performance and consistency advantag" + "es we discussed in detailIn this chapter you general, we recommend Java Concurrency in Practice by Bri" + "an Goetz. His book will give We’ve reached an exciting point—in the next chapter;the counter is: 1 2222" + "sdsa ddasd asdsadas dsadasdas" + System.getProperty("line.separator")).getBytes(); }
打完標記,其實我們這個示例中的server中還不知道是以行為結尾的,所以我們需要修改server的handler鏈,在inbound鏈中加一個額外的處理鏈,判斷一下,獲取的資訊按照行來切分,我們很慶幸,這樣枯燥的程式碼Netty已經幫我們完美地完成了,Netty提供了一個LineBasedFrameDecoder這個類,顧名思義,這個類名字中有decoder,說明是一個解碼器,我們再看看它的詳細宣告:
它是繼承ByteToMessageDecoder的,是將byte型別轉化成Message的,所以我們應該將這個解碼器放在inbound處理器鏈的第一個,所以我們修改一下Server端的啟動程式碼:/** * A decoder that splits the received {@link ByteBuf}s on line endings. * <p> * Both {@code "\n"} and {@code "\r\n"} are handled. * For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}. */ public class LineBasedFrameDecoder extends ByteToMessageDecoder { /** Maximum length of a frame we're willing to decode. */ private final int maxLength; /** Whether or not to throw an exception as soon as we exceed maxLength. */ private final boolean failFast; private final boolean stripDelimiter; /** True if we're discarding input because we're already over maxLength. */ private boolean discarding; private int discardedBytes;
package com.lyncc.netty.stickpackage.myself;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
public class BaseServer {
private int port;
public BaseServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new BaseServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 繫結埠,開始接收進來的連線
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new BaseServer(port).start();
}
}
這樣,我們只是在initChannel方法中增加了一個LineBasedFrameDecoder這個類,其中2048是規定一行資料最大的位元組數
我們再次執行,我們再看看效果:
可以看到客戶端傳送的兩次msg,被伺服器端成功地兩次接收了,我們要的效果達到了
我們將LineBasedFrameDecoder中的2048引數,縮小一半,變成1024,我們再看看效果:
出現了異常,這個異常時TooLongFrameException,這個異常在Netty in Action中介紹過,幀的大小太大,在我們這個場景中,就是我們傳送的一行資訊大小是1076,大於了我們規定的1024所以報錯了
我們再解決另一個粘包的問題,我們可以看到上節中介紹的那個粘包案例中,我們傳送了100次的資訊“BazingaLyncc is learner”,這個案例很特殊,這個資訊是一個特長的資料,位元組長度是23,所以我們可以使用Netty為我們提供的FixedLengthFrameDecoder這個解碼器,看到這個名字就明白了大半,定長資料幀的解碼器,所以我們修改一下程式碼:
BaseClientHandler:
package com.lyncc.netty.stickpackage.myself;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class BaseClientHandler extends ChannelInboundHandlerAdapter{
private byte[] req;
public BaseClientHandler() {
req = ("BazingaLyncc is learner").getBytes();
// req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. His book w"
// + "ill give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the process "
// + "of configuring and connecting all of Netty’s components to bring your learned about threading models in ge"
// + "neral and Netty’s threading model in particular, whose performance and consistency advantages we discuss"
// + "ed in detail In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. Hi"
// + "s book will give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the"
// + " process of configuring and connecting all of Netty’s components to bring your learned about threading "
// + "models in general and Netty’s threading model in particular, whose performance and consistency advantag"
// + "es we discussed in detailIn this chapter you general, we recommend Java Concurrency in Practice by Bri"
// + "an Goetz. His book will give We’ve reached an exciting point—in the next chapter;the counter is: 1 2222"
// + "sdsa ddasd asdsadas dsadasdas" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
// message = Unpooled.buffer(req.length);
// message.writeBytes(req);
// ctx.writeAndFlush(message);
// message = Unpooled.buffer(req.length);
// message.writeBytes(req);
// ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
BaseServer:
package com.lyncc.netty.stickpackage.myself;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
public class BaseServer {
private int port;
public BaseServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(23));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new BaseServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 繫結埠,開始接收進來的連線
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new BaseServer(port).start();
}
}
我們就是在channelhandler鏈中,加入了FixedLengthFrameDecoder,且引數是23,告訴Netty,獲取的幀資料有23個位元組就切分一次
執行結果:
可以看見,我們獲取到了我們想要的效果
當然Netty還提供了一些其他的解碼器,有他們自己的使用場景,例如有按照某個固定字元切分的DelimiterBasedFrameDecoder的解碼器
我們再次修改程式碼:
BaseClientHandler.java
package com.lyncc.netty.stickpackage.myself;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class BaseClientHandler extends ChannelInboundHandlerAdapter{
private byte[] req;
public BaseClientHandler() {
// req = ("BazingaLyncc is learner").getBytes();
req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. $$__ His book w"
+ "ill give We’ve reached an exciting point—in the next chapter we’ll $$__ discuss bootstrapping, the process "
+ "of configuring and connecting all of Netty’s components to bring $$__ your learned about threading models in ge"
+ "neral and Netty’s threading model in particular, whose performance $$__ and consistency advantages we discuss"
+ "ed in detail In this chapter you general, we recommend Java $$__Concurrency in Practice by Brian Goetz. Hi"
+ "s book will give We’ve reached an exciting point—in the next $$__ chapter we’ll discuss bootstrapping, the"
+ " process of configuring and connecting all of Netty’s components $$__ to bring your learned about threading "
+ "models in general and Netty’s threading model in particular, $$__ whose performance and consistency advantag"
+ "es we discussed in detailIn this chapter you general, $$__ we recommend Java Concurrency in Practice by Bri"
+ "an Goetz. His book will give We’ve reached an exciting $$__ point—in the next chapter;the counter is: 1 2222"
+ "sdsa ddasd asdsadas dsadasdas" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
// for (int i = 0; i < 100; i++) {
// message = Unpooled.buffer(req.length);
// message.writeBytes(req);
// ctx.writeAndFlush(message);
// }
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
我們在req的字串中增加了“$$__”這樣的切割符,然後再Server中照例增加一個DelimiterBasedFrameDecoder,來切割字串:
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("$$__".getBytes())));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new BaseServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
我們在initChannel中第一個inbound中增加了DelimiterBasedFrameDecoder,且規定切割符就是“$$__”,這樣就能正常切割了,我們看看執行效果:
可以看到被分了20次讀取,我們可以這樣理解,客戶端傳送了2次req位元組,每個req中有10個“$$__”,這樣就是第11次切割的時候其實發送了粘包,第一個req中末尾部分和第二次的頭部粘在了一起,作為第11部分的內容
而最後一部分的內容因為沒有"$$__"切割,所以沒有列印在控制檯上~
其實這類的Handler還是相對比較簡單的,真實的生產環境這些decoder只是作為比較基本的切分類,但是這些decoder還是很好用的~
希望講的對您有所幫助~END~