Netty中處理TCP粘包和拆包
什麼是粘包和拆包
TCP是個”流”協議,流其實就是沒有界限的一串資料。
TCP底層中並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包劃分,所以在TCP中就有可能一個完整地包會被TCP拆分成多個包,也有可能吧多個小的包封裝成一個大的資料包傳送,這就是拆包和粘包的問題。
問題例項:假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,所以存在幾種情況:
1.不存在粘包和分包的時候,服務端分兩次讀取到了兩個獨立的資料包。
2.存在粘包,一次讀取了D1和D2。
3.存在拆包,第一次讀取的是完整的D1包和部分D2包,第二次讀取到了D2包的剩餘內容。
4.第一次讀取到了D1包的部分內容,第二次讀取了D1剩下的內容和D2獨立的包。
為什麼會出現呢?
1.應用程式write寫入的位元組大小大於套介面傳送緩衝區大小(出現拆包).
2,進行MSS大小的TCP分段
3.大於MTU進行分片
這裡一般來說
MSS <=MTU-40(IPV4) MSS<= MTU -60 (IPV6)
解決策略
下面的策略都可以對應於我們Netty中的某個handler解決
1.訊息定長.
2.在包尾增加回車換行符進行分割,例如FTP協議;
3將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度。
4.複雜的應用協議。
粘包導致的問題
我們可以設計一個類似Echo伺服器的功能,客戶端發出一百條,echo回顯肯定也是一百條。這裡有:
服務端程式碼
package time2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import time2.TimeServerHandler;
/**
* Created by lz on 2016/8/9.
*/
public class TimeServer {
public void bind(int port) throws Exception{
//配置服務端的NIO執行緒組,一個用於接收連線,一個用於處理連線
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannerHanler());
//繫結埠,同步等待
ChannelFuture f = b.bind(port).sync();
//等待服務端監聽埠關閉
f.channel().closeFuture().sync();
}finally {
//優雅退出,釋放執行緒資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannerHanler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new TimeServer().bind(port);
}
}
服務端handler
package time2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Date;
/**
* Created by lz on 2016/8/10.
*/
public class TimeServerHandler extends ChannelHandlerAdapter{
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8").substring(0,req.length-System.getProperty("line.separator").length());
System.out.println("Time Server得到了資料:" + body+";counter是:"+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date().toString():"BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public static void main(String[] args) {
System.out.println("1"+System.getProperty("line.separator")+"1");
}
}
客戶端程式碼
package time2;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Created by lz on 2016/8/9.
*/
public class TimeClient {
public void connect(int port,String host) throws Exception{
//配置客戶端NIO執行緒組
EventLoopGroup group =new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//發起非同步連線操作
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally {
//優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new TimeClient().connect(port,"127.0.0.1");
}
}
客戶端Handler程式碼
package time2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* Created by lz on 2016/8/10.
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
private int counter;
private byte[] req;
public TimeClientHandler(){
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String( req,"UTF-8");
System.out.println("Now is:"+body+";the counter is :" + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
進行測試分別執行客戶端和伺服器端,發現出現了粘包現象,客戶端的counter只加了一次,這就出現了粘包問題,在我們初學的時候可以不考慮這個問題但是當壓力上來後,傳送大報文,就會出現粘包/拆包。
我們就需要我們的半包解碼器。如下:
LineBasedFrameDecoder
這是一個一換行符為界限的解碼器,如果有換行我們就把他標記為結束位置,這樣就組成一個獨立的包。我們可以設定最大長度。如果連續讀取到最大長度後沒有發現換行符,就會丟擲異常,同時忽略掉之前讀到的異常碼流。程式碼如下:
Server端程式碼:
package time3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* Created by lz on 2016/8/10.
*/
public class TimeServer {
public void bind(int port) throws Exception{
//配置服務端的NIO執行緒組,一個用於接收連線,一個用於處理連線
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
//繫結埠,同步等待
ChannelFuture f = b.bind(port).sync();
//等待服務端監聽埠關閉
f.channel().closeFuture().sync();
}finally {
//優雅退出,釋放執行緒資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("已經繫結");
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new TimeServer().bind(port);
}
}
ServerHandler端程式碼:
package time3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Date;
/**
* Created by lz on 2016/8/10.
*/
public class TimeServerHandler extends ChannelHandlerAdapter{
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Time Server得到了資料:" + body+";counter是:"+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date().toString():"BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
public static void main(String[] args) {
System.out.println("1"+System.getProperty("line.separator")+"1");
}
}
Client端程式碼:
package time3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* Created by lz on 2016/8/10.
*/
public class TimeClient {
public void connect(int port,String host) throws Exception{
//配置客戶端NIO執行緒組
EventLoopGroup group =new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
//發起非同步連線操作
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally {
//優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new TimeClient().connect(port,"127.0.0.1");
}
}
Client端Handler程式碼
package time3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* Created by lz on 2016/8/10.
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
private int counter;
private byte[] req;
public TimeClientHandler(){
System.out.println("初始化");
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("傳送資料");
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is:"+body+";the counter is :" + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
通過我們的解碼器我們的程式碼就簡單了很多,部分地方有註釋就不祥解了。
DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder允許我們設定特定的分隔符用來分隔,用法和上面類似,
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //使用$_為分隔符
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
上面就是以$_為分隔符,這裡我們用echo伺服器做測試:
EchoServer端程式碼
package echo1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import time3.TimeServerHandler;
/**
* Created by lz on 2016/8/10.
*/
public class EchoServer {
public void bind(int port) throws Exception{
//配置服務端的NIO執行緒組,一個用於接收連線,一個用於處理連線
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024) //設定accept的最大 也就是三次握手成功的佇列長度
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChildChannelHandler());
//繫結埠,同步等待
ChannelFuture f = b.bind(port).sync();
//等待服務端監聽埠關閉
f.channel().closeFuture().sync();
}finally {
//優雅退出,釋放執行緒資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("已經繫結");
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //使用$_為分隔符
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new EchoServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new EchoServer().bind(port);
}
}
EchoServer Handler程式碼
package echo1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* Created by lz on 2016/8/10.
*/
public class EchoServerHandler extends ChannelHandlerAdapter {
int counter = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("第"+ ++counter +"次接收"+body);
body +="$_";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();//發生異常,關閉鏈路
}
}
Echo Client端程式碼
package echo1;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import time3.TimeClientHandler;
/**
* Created by lz on 2016/8/10.
*/
public class EchoClient {
public void connect(int port,String host) throws Exception{
//配置客戶端NIO執行緒組
EventLoopGroup group =new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //使用$_為分隔符
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
//發起非同步連線操作
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally {
//優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new EchoClient().connect(port,"127.0.0.1");
}
}
EchoClient Handler程式碼
package echo1;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* Created by lz on 2016/8/10.
*/
public class EchoClientHandler extends ChannelHandlerAdapter {
private int counter;
static final String ECHO_REQ = "Hi,lizhao$_";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("這是第"+ ++counter + "次接收" + msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
執行上面的程式碼可以展示出正確的結果,我們當然不止這幾個我們還有FixedLengthFrameDecoder固定長度的解碼器,以及其他解碼器,更多檢視官方手冊。