Java網路程式設計之Netty拆包和黏包-yellowcong
阿新 • • 發佈:2019-01-10
Netty中,解決拆包和黏包中,解決方式有三種
1、在每個包尾部,定義分隔符,通過回車符號,或者其他符號來解決
2、通過定義每個包的大小,如果包不夠就空格填充
3、自定義協議的方式,將訊息分為訊息頭和訊息體,在訊息頭中表示出訊息的總長度,然後進行邏輯處理。
案例
這個案例是通過第一種方式,通過回車符號的方式來解決拆包和黏包,通過在childHandler 中新增 指定的分隔符進行拆包黏包,通過DelimiterBasedFrameDecoder (定義分隔符)和StringDecoder(將傳送的訊息定義為String型別,而不是預設的ByteBuf型別了)這兩個類完成處理,在客戶端和服務端,都要 設定相同的Handler。
服務戶端
伺服器端,重點是childHandler ,自定義了包的分隔符
package yellowcong.socket.netty2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* 建立日期:2017年10月8日 <br/>
* 建立使用者:yellowcong <br/>
* 功能描述:
*/
public class Server {
public static final Integer PORT = 8080;
public static void main(String[] args) throws Exception {
//1.第一個執行緒組 是用於接收Client端連線的
EventLoopGroup bootGroup = new NioEventLoopGroup(); //客戶端
//2.第二個執行緒組 實際業務操作請求
EventLoopGroup workGroup = new NioEventLoopGroup(); //網路讀寫
//3.伺服器啟動類,配置伺服器
ServerBootstrap bootstrap = new ServerBootstrap();
//加入客戶端執行緒和網路讀寫
bootstrap.group(bootGroup, workGroup)
//我要指定使用NioServerSocketChannel這種型別的通道 ,當我們是Http的時候,需要更換這個Channel的型別
.channel(NioServerSocketChannel.class)
// 指定處理SockerChannel 的處理器
.childHandler(new ChannelInitializer<SocketChannel>(){ //一掉要注意這個 SocketChannel 是Netty封裝的,不是NIO
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//將我們的伺服器處理類傳遞進去
//設定回車就是一條訊息
ByteBuf spilt = Unpooled.copiedBuffer("\r\n".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, spilt));
//字串形式的解碼
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ServerHandler());
}
})
//設定大小
.option(ChannelOption.SO_BACKLOG, 128) //生產環境中,最好配額制100多
//保持連線
.childOption(ChannelOption.SO_KEEPALIVE, true)
;
System.out.println("伺服器啟動。。。。。。");
//繫結指定的埠 進行監聽
ChannelFuture future = bootstrap.bind(PORT).sync();
//如果不休眠 ,直接就結束了
// Thread.sleep(1000000);
//關閉Channel
//這個是相當於程式是睡眠模式,執行緒阻塞在這個地方
future.channel().closeFuture().sync();
//關閉執行緒組
bootGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
服務端訊息處理
訊息處理端,複寫了channelReadComplete 方法,用來斷開連線操作,其次在channelRead中,訊息是String型別的,不能轉化為ByteBuf型別處理。訊息傳送的時候,加上了分隔符(\r\n)
package yellowcong.socket.netty2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 建立日期:2017年10月8日 <br/>
* 建立使用者:yellowcong <br/>
* 功能描述:伺服器請求
*/
//這個Handler需要繼承 ChannelHandlerAdapter,這個是Netty實現的
public class ServerHandler extends ChannelHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//由於設定的解碼器是String 型別的,這個地方就Msg物件是String型別的
//讀取Message
String result = (String) msg;
System.out.println(result);
//回饋客戶端
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Server get Data\r\n".getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//設定響應後就關閉程式
ctx.close().addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//用來處理Netty執行過程中的異常
cause.printStackTrace(); //列印錯誤
ctx.close(); //關閉容器
}
}
客戶端
客戶端,也是在handler 的處理類中,增加了兩個處理器,這樣保證伺服器和客戶端的處理訊息的方式一致,傳送訊息的時候加上了分隔符(\r\n)
package yellowcong.socket.netty2;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
/**
* 建立日期:2017年10月8日 <br/>
* 建立使用者:yellowcong <br/>
* 功能描述:
*/
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();
//客戶端啟動
Bootstrap boot = new Bootstrap();
boot.group(workGroup)
.channel(NioSocketChannel.class) //客戶端的Socker
.handler(new ChannelInitializer<SocketChannel>(){ //一掉要注意這個 SocketChannel 是Netty封裝的,不是NIO
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//將我們的伺服器處理類傳遞進去
ByteBuf spilt = Unpooled.copiedBuffer("\r\n".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, spilt));
//字串形式的解碼
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ClientHandler());
}
});
//繫結指定的埠 進行監聽
ChannelFuture channel = boot.connect("127.0.0.1", 8080).sync();
//傳送資料
channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client1\r\n".getBytes()));
channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client2\r\n".getBytes()));
channel.channel().writeAndFlush(Unpooled.copiedBuffer("hello Client3\r\n".getBytes()));
//這個相當於沒有關閉Channel,註釋
channel.channel().closeFuture().sync();
//關閉執行緒組
workGroup.shutdownGracefully();
}
}
客戶端訊息處理類
訊息處理類,沒做任何改變,只是讀取訊息
package yellowcong.socket.netty2;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
/**
* 建立日期:2017年10月8日 <br/>
* 建立使用者:yellowcong <br/>
* 功能描述:客戶端請求的處理
*/
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//列印錯誤
cause.printStackTrace();
//關閉容器
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
//獲取message
String data = (String) msg;
//列印資料
System.out.println("Client:\t"+data);
} catch (Exception e) {
e.printStackTrace();
}finally{
//釋放訊息
ReferenceCountUtil.release(msg);
}
}
}