網際網路技術21——netty拆包粘包
阿新 • • 發佈:2018-11-23
Netty拆包粘包
在基於流的傳輸裡比如TCP/IP,接收到的資料會先被儲存到一個socket接收緩衝裡。不幸的是,基於流的傳輸並不是一個數據包佇列,而是一個位元組佇列。即使你傳送了2個獨立的資料包,作業系統也不會作為2個訊息處理而僅僅是作為一連串的位元組而言。因此這是不能保證你遠端寫入的資料就會準確地讀取。
參考資料:http://ifeve.com/netty5-user-guide
常用的拆包粘包主要有3種方式:
- 1、訊息定長,例如每個報文的大小固定為200個位元組,如果不夠,空位補空格。
- 2、在包尾部增加特殊字串進行分割,例如加回車等
- 3、 將訊息分為訊息頭和訊息體,在訊息頭中包含表示訊息總長度的欄位,然後進行業務邏輯的處理
(1) 在包尾部增加特殊字串進行分割在上一年部落格中同string型別傳輸一起做了程式碼演示
https://blog.csdn.net/qq_28240551/article/details/82393565
關鍵點為:
bootstrap.group(boss, worker) //指定channel型別 .channel(NioServerSocketChannel.class) //handler會在初始化時就執行,而childHandler會在客戶端成功connect後才執行 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ByteBuf byteBuf = Unpooled.copiedBuffer("$_".getBytes()); socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new ServerHandler()); } }) //設定tcp緩衝區大小 .option(ChannelOption.SO_BACKLOG, 128) //設定傳送緩衝區大小 .option(ChannelOption.SO_SNDBUF, 1024 * 32) //設定接收緩衝區大小 .option(ChannelOption.SO_RCVBUF, 1024 * 32) //設定是否儲存長連線 .childOption(ChannelOption.SO_KEEPALIVE, true);
(2)關於訊息頭和訊息體的方法,這裡不做演示了,工作中用到可以搜一下使用例子
(3)這裡再演示一下報文定長的方式進行資料傳輸,(總體看使用頻率,分割>定長>報文)
關鍵點為
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new ClientHandler()); } }) .option(ChannelOption.SO_KEEPALIVE,true);
server
package com.nettyFixed;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Created by BaiTianShi on 2018/9/5.
*/
public class Server {
private int port;
public Server(int port) {
this.port = port;
}
public void run() {
//用來接收連線事件組
EventLoopGroup boss = new NioEventLoopGroup();
//用來處理接收到的連線事件處理組
EventLoopGroup worker = new NioEventLoopGroup();
//server配置輔助類
ServerBootstrap bootstrap = new ServerBootstrap();
try {
//將連線接收組與事件處理組連線,當server的boss接收到連線收就會交給worker處理
bootstrap.group(boss, worker)
//指定channel型別
.channel(NioServerSocketChannel.class)
//handler會在初始化時就執行,而childHandler會在客戶端成功connect後才執行
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new ServerHandler());
}
})
//設定tcp緩衝區大小
.option(ChannelOption.SO_BACKLOG, 128)
//設定傳送緩衝區大小
.option(ChannelOption.SO_SNDBUF, 1024 * 32)
//設定接收緩衝區大小
.option(ChannelOption.SO_RCVBUF, 1024 * 32)
//設定是否儲存長連線
.childOption(ChannelOption.SO_KEEPALIVE, true);
//注意。此處option()是提供給NioServerSocketChannel用來接收進來的連線,也就是boss執行緒
//childOption是提供給有福管道serverChannel接收到的連線,也就是worker執行緒,在這個例子中也就是NioServerSocketChannel
//非同步繫結埠,可以繫結多個埠
ChannelFuture fu1 = bootstrap.bind(port).sync();
ChannelFuture fu2 = bootstrap.bind(8766).sync();
//非同步檢查是否關閉
fu1.channel().closeFuture().sync();
fu2.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
public static void main(String[] args) {
Server server = new Server(8765);
server.run();
}
}
client
package com.nettyFixed;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Created by BaiTianShi on 2018/9/5.
*/
public class Client {
private String ip;
private int port;
public Client(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void run(){
//客戶端用來連線服務端的連線組
EventLoopGroup worker= new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new ClientHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE,true);
try {
//可以進多個埠同時連線
ChannelFuture fu1 = bootstrap.connect(ip,port).sync();
ChannelFuture fu2 = bootstrap.connect(ip,8766).sync();
fu1.channel().writeAndFlush("aaaaaaa");
fu2.channel().writeAndFlush("1111222");
fu1.channel().closeFuture().sync();
fu2.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
Client cl = new Client("127.0.0.1",8765);
cl.run();
}
}
server端控制檯
client控制檯
‘’