netty自定義訊息實現心跳檢測與重連
阿新 • • 發佈:2018-12-20
其實客戶端心跳傳送用到的是IdleStateHandler,詳細看程式碼你就會明白為什麼。
//處理空閒狀態事件的處理器
pipeline.addLast(new IdleStateHandler(6,7,8, TimeUnit.SECONDS));
在IdleStateHandler中前三個引數分別是:
1.當在6秒內收到訊息,觸發IdleStateEvent事件。
2.當在7秒內沒有傳送訊息,觸發IdleStateEvent事件。
3.當在8內沒有接收到資料且在8秒內沒有傳送資料,觸發IdleStateEvent事件。
下面是實現的程式碼,程式碼量有點多....:
常量資料:
/**
* Created by XiChuan on 2018-11-07.
*/
public class Constant {
public static final int HEAD = 0x76;
public static final String TYPE_PING = "PING";
public static final String TYPE_MESSAGE = "MESSAGE";
}
自定義協議:
/** * Created by XiChuan on 2018-11-07. */ import java.util.Arrays; /** * <pre> * 自己定義的協議 * 資料包格式 * +——----——+——-----——+——----——+ * |協議開始標誌| 訊息型別長度 | 訊息型別 | 資料長度 | 資料 | * +——----——+——-----——+——----——+ * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76 * 2.要傳的協議型別長度(String的byte[]長度) * 3.要傳的協議型別(String) * 4.傳輸資料的長度contentLength,int型別 * 5.要傳輸的資料 * </pre> */ public class MessageProtocol { /** * 訊息的開頭的資訊標誌 */ private int headData = Constant.HEAD; /** * 訊息型別長度 */ private int typeLength; /** * 訊息型別 */ private String type; /** * 訊息的長度 */ private int contentLength; /** * 訊息的內容 */ private byte[] content; /** * 用於初始化,SmartCarProtocol * * @param contentLength * 協議裡面,訊息資料的長度 * @param content * 協議裡面,訊息的資料 */ public MessageProtocol(int typeLength,String type,int contentLength, byte[] content) { this.typeLength = typeLength; this.type = type; this.contentLength = contentLength; this.content = content; } public int getHeadData() { return headData; } public void setHeadData(int headData) { this.headData = headData; } public int getTypeLength() { return typeLength; } public void setTypeLength(int typeLength) { this.typeLength = typeLength; } public String getType() { return type; } public void setType(String type) { this.type = type; } public int getContentLength() { return contentLength; } public void setContentLength(int contentLength) { this.contentLength = contentLength; } public byte[] getContent() { return content; } public void setContent(byte[] content) { this.content = content; } @Override public String toString() { return "MessageProtocol " + "[head_data=" + headData +", typeLength="+typeLength +", type="+type + ", contentLength=" + contentLength + ", content=" + Arrays.toString(content) + "]"; } }
定義解碼器:
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * Created by XiChuan on 2018-11-06. */ public class MessageProtocolDecoder extends ByteToMessageDecoder { /** * <pre> * 協議開始的標準head_data,int型別,佔據4個位元組. * 表示資料的長度contentLength,int型別,佔據4個位元組. * </pre> */ public final int BASE_LENGTH = 4 + 4; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { // 可讀長度必須大於基本長度 //System.out.println("buff的可讀長度是:"+buffer.readableBytes()); if (buffer.readableBytes() >= BASE_LENGTH) { // 防止socket位元組流攻擊 // 防止,客戶端傳來的資料過大 // 因為,太大的資料,是不合理的 if (buffer.readableBytes() > 2048) { buffer.skipBytes(buffer.readableBytes()); } // 記錄包頭開始的index int beginReader; while (true) { // 獲取包頭開始的index beginReader = buffer.readerIndex(); //System.out.println("記錄包頭開始的index:"+beginReader); // 標記包頭開始的index buffer.markReaderIndex(); // 讀到了協議的開始標誌,結束while迴圈 int head = buffer.readInt(); //System.out.println("讀取的int:"+head); if (head == Constant.HEAD) { break; } // 未讀到包頭,略過一個位元組 // 每次略過,一個位元組,去讀取,包頭資訊的開始標記 buffer.resetReaderIndex(); buffer.readByte(); // 當略過,一個位元組之後, // 資料包的長度,又變得不滿足 // 此時,應該結束。等待後面的資料到達 if (buffer.readableBytes() < BASE_LENGTH) { return; } } //型別長度 int typeLength = buffer.readInt(); //訊息型別 byte[] typeBytes = new byte[typeLength]; buffer.readBytes(typeBytes); String type = new String(typeBytes); // 內容長度 int dataLength = buffer.readInt(); // 讀取data資料 byte[] data = new byte[dataLength]; buffer.readBytes(data); MessageProtocol protocol = new MessageProtocol(typeLength,type,dataLength, data); out.add(protocol); } } }
定義編碼器:
import ch.qos.logback.core.encoder.ByteArrayUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Created by XiChuan on 2018-11-07.
*/
public class MessageProtocolEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol msg, ByteBuf byteBuf) throws Exception {
// 寫入訊息SmartCar的具體內容
// 1.寫入訊息的開頭的資訊標誌(int型別)
byteBuf.writeInt(msg.getHeadData());
//2寫入訊息型別長度資訊(int)
byteBuf.writeInt(msg.getTypeLength());
//3寫入訊息型別(byte[])
byteBuf.writeBytes(msg.getType().getBytes());
// 4.寫入訊息的長度(int 型別)
byteBuf.writeInt(msg.getContentLength());
// 5.寫入訊息的內容(byte[]型別)
byteBuf.writeBytes(msg.getContent());
}
}
定義服務端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* Created by XiChuan on 2018-11-05.
*/
public class Server {
private int port;
public Server(int port){this.port = port;}
public void run()throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用來接收進來的連線
EventLoopGroup workerGroup = new NioEventLoopGroup(); //用來處理已經被接收的連線
try {
ServerBootstrap bootstrap = new ServerBootstrap(); //啟動NIO服務的輔助啟動類
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class) //服務端
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//心跳機制 引數:1.讀空閒超時時間 2.寫空閒超時時間 3.所有型別的空閒超時時間(讀、寫) 4.時間單位
//在Handler需要實現userEventTriggered方法,在出現超時事件時會被觸發
socketChannel.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 0, 0,TimeUnit.SECONDS));
//設定解碼器
socketChannel.pipeline().addLast("decoder", new MessageProtocolDecoder());//new ByteArrayDecoder());//new FixedLengthFrameDecoder(4));
//設定自定義ChannelHandler
socketChannel.pipeline().addLast("channelHandler", new ServerHandler());
//設定編碼器
socketChannel.pipeline().addLast("encoder",new MessageProtocolEncoder());//new ByteArrayEncoder());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture cf = bootstrap.bind(port).sync(); //繫結埠,開始接收進來的連線
cf.channel().closeFuture().sync(); //等待伺服器socket關閉
}catch (Exception e){
e.printStackTrace();
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args)throws Exception{
new Server(8081).run();
}
}
定義服務段處理handler:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by XiChuan on 2018-11-05.
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger channelCount = new AtomicInteger(0); //通道數量
/**
* 讀資料
* @param ctx
* @param msg
* @throws Exception
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("read channel=" + ctx.channel() + ", total channel=" + channelCount);
try {
MessageProtocol message = (MessageProtocol)msg;
System.out.println("receive client message:"+message.toString()+",ip:"+ctx.channel().remoteAddress());
} finally {
// 拋棄收到的資料
ReferenceCountUtil.release(msg);
}
}
/**
* 心跳檢測的超時時會觸發
* @param ctx
* @param evt
* @throws Exception
*/
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) { //讀取心跳超時後,會將此channel連線斷開
System.out.println("trigger channel =" + ctx.channel());
ctx.close(); //如果超時,關閉這個通道
}
} else if (evt instanceof SslHandshakeCompletionEvent) {
System.out.println("ssl handshake done");
//super.userEventTriggered(ctx,evt);
}
}
/**
* 當通道活動時
* @param ctx
* @throws Exception
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channelCount.incrementAndGet();//當有新通道連線進來時,將通道數+1
System.out.println("active channel=" + ctx.channel() + ", total channel=" + channelCount + ", id=" + ctx.channel().id().asShortText());
}
/**
* 當通道不活動時
* @param ctx
* @throws Exception
*/
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//當通道關閉時,將通道數-1
ctx.close();
channelCount.decrementAndGet();
System.out.println("inactive channel,channel=" + ctx.channel() +", id=" + ctx.channel().id().asShortText());
}
/**
* 異常獲取
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exception channel=" + ctx.channel() + " cause=" + cause); //如果不活躍關閉此通道
ctx.close();
}
}
後面主要就是客戶端程式碼了,主要就在這裡:
定義客戶端的ChannelHandler集合:
import io.netty.channel.ChannelHandler;
/**
*
* 客戶端的ChannelHandler集合,由子類實現,這樣做的好處:
* 繼承這個介面的所有子類可以很方便地獲取ChannelPipeline中的Handlers
* 獲取到handlers之後方便ChannelPipeline中的handler的初始化和在重連的時候也能很方便
* 地獲取所有的handlers
*/
public interface ChannelHandlerHolder {
ChannelHandler[] handlers();
}
定義客戶端重連檢測程式碼:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
*
* 重連檢測狗,當發現當前的鏈路不穩定關閉之後,進行12次重連
*/
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
private final Bootstrap bootstrap;
private final Timer timer;
private final int port;
private final String host;
private volatile boolean reconnect = true;
private int attempts;
public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
this.bootstrap = bootstrap;
this.timer = timer;
this.port = port;
this.host = host;
this.reconnect = reconnect;
}
/**
* channel鏈路每次active的時候,將其連線的次數重新☞ 0
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("當前鏈路已經激活了,重連嘗試次數重新置為:0");
attempts = 0;
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("連結關閉");
if(reconnect){
System.out.println("連結關閉,將進行重連,第"+(attempts+1)+"嘗試");
if (attempts < 12) {
attempts++;
//重連的間隔時間會越來越長
int timeout = 2 << attempts;
timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
}
}
ctx.fireChannelInactive();
}
public void run(Timeout timeout) throws Exception {
ChannelFuture future;
//bootstrap已經初始化好了,只需要將handler填入就可以了
synchronized (bootstrap) {
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
//handlers()方法在client住方法上已經初始化了
ch.pipeline().addLast(handlers());
}
});
future = bootstrap.connect(host,port);
}
//future物件
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
boolean succeed = f.isSuccess();
//如果重連失敗,則呼叫ChannelInactive方法,再次出發重連事件,一直嘗試12次,如果失敗則不再重連
if (!succeed) {
System.out.println("重連失敗");
f.channel().pipeline().fireChannelInactive();
}else{
System.out.println("重連成功");
}
}
});
}
}
定義客戶端:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.TimeUnit;
/**
* Created by XiChuan on 2018-11-05.
*/
public class Client {
protected final HashedWheelTimer timer = new HashedWheelTimer();
private Bootstrap b;
private String host;
private int port;
public Client(String host,int port){
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));
final ConnectionWatchdog watchdog = new ConnectionWatchdog(b, timer, port,host, true) {
public ChannelHandler[] handlers() {
return new ChannelHandler[] {
this,
new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
//idleStateTrigger,
new MessageProtocolDecoder(),
new MessageProtocolEncoder(),
new ClientHandler()
};
}
};
ChannelFuture future;
//進行連線
try {
synchronized (b) {
b.handler(new ChannelInitializer<Channel>() {
//初始化channel
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(watchdog.handlers());
}
});
future = b.connect(host,port);
}
// 以下程式碼在synchronized同步塊外面是安全的
future.sync();
//future.channel().closeFuture().sync();
} catch (Throwable t) {
throw new Exception("connects to fails", t);
}finally {
//group.shutdownGracefully();
//System.out.println("client release resource...");
}
}
public static void main(String[] args) throws Exception {
new Client("127.0.0.1",8081).run();
}
}
定義客戶端handler:
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import java.util.Date;
/**
* Created by XiChuan on 2018-11-05.
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
// 客戶端與服務端,連線成功的的處理
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("active channel:" + ctx.channel()+",time:"+new Date().toLocaleString());
ctx.fireChannelActive();
/*// 傳送自定義Message協議的訊息
// 要傳送的資訊
String type = Constant.TYPE_PING;
int typeLength = type.getBytes().length;
String str = "I am client ...";
// 獲得要傳送資訊的位元組陣列
byte[] content = str.getBytes();
// 要傳送資訊的長度
int contentLength = content.length;
MessageProtocol protocol = new MessageProtocol(typeLength,type,contentLength, content);
System.out.println("send message:"+protocol.toString());
Channel channel = ctx.channel();
channel.writeAndFlush(protocol);*/
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("inactive channel:" + ctx.channel()+",time:"+new Date().toLocaleString());
}
// 只是讀資料,沒有寫資料的話
// 需要自己手動的釋放的訊息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("read channel:" + ctx.channel());
try {
MessageProtocol messageProtocol = (MessageProtocol) msg;
System.out.println("receive server message:" + messageProtocol.toString());
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
// write heartbeat to server
String message = "heart beat ...";
MessageProtocol messageProtocol = new MessageProtocol(
Constant.TYPE_PING.getBytes().length,
Constant.TYPE_PING,
message.getBytes().length,
message.getBytes());
ctx.writeAndFlush(messageProtocol);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
上述是全部的程式碼,先執行server然後執行client,效果如下:
服務端:
客戶端:
當關閉服務端,客戶端日誌: