netty4.0 心跳檢測與斷線重連操作
因為最近專案最近要用netty,服務端放在雲端,客戶端發在內網。那如何實現netty長連線和斷線重連呢(網路故障或者其他原因,客戶端要無限取重連服務端)。接下來我們看一下如何實現這個兩個功能呢。
服務端程式碼如下:
package com.example.nettydemo.demo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; /** * @ClassName: NettyServer * @Author: huangzf * @Date: 2018/9/25 15:40 * @Description: */ @Slf4j public class NettyServer { private NettyServerChannelInitializer serverChannelInitializer = null; private int port = 8000; public void bind() throws Exception { //配置服務端的NIO執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { serverChannelInitializer = new NettyServerChannelInitializer(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //保持長連線 .childOption(ChannelOption.SO_KEEPALIVE,true) .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(serverChannelInitializer); //繫結埠,同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待伺服器監聽埠關閉 f.channel().closeFuture().sync(); } finally { //釋放執行緒池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyServer().bind(); } }
服務端中NettyServerChannelInitializer的實現如下:
package com.example.nettydemo.demo; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * @ClassName: NettyServerChannelInitializer * @Author: huangzf * @Date: 2018/9/25 15:43 * @Description: */ public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> { private NettyServerHandler handler ; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //解決TCP粘包拆包的問題,以特定的字元結尾($_) pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$_".getBytes()))); //字串解碼和編碼 pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new IdleStateHandler(40,0,0,TimeUnit.SECONDS)); //伺服器的邏輯 handler = new NettyServerHandler(); pipeline.addLast("handler", handler); } }
因為TCP在傳送過程用可能會發生粘包拆包問題,netty中給了我們很好的解決方法,就是每次傳送訊息是已特殊的符號(可自定義)$_ 結尾,只收收到的訊息以$_ 符號結尾是該訊息才算接收完畢。
ipeline.addLast(new IdleStateHandler(40,0,0,TimeUnit.SECONDS)); 該程式碼實現了心跳檢測,每隔40s檢測一次是否要讀事件,如果超過40s你沒有讀事件的發生,則執行相應的操作(在handler中實現)
服務端 NettyServerHandler 實現如下:
package com.example.nettydemo.demo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; /** * @ClassName: NettyServerHandler * @Author: huangzf * @Date: 2018/9/25 15:44 * @Description: */ @Slf4j public class NettyServerHandler extends SimpleChannelInboundHandler { /** * 心跳丟失次數 */ private int counter = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Client say : " + msg.toString()); //重置心跳丟失次數 counter = 0; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("RemoteAddress : " + ctx.channel().remoteAddress().toString()+ " active !"); super.channelActive(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)){ // 空閒40s之後觸發 (心跳包丟失) if (counter >= 3) { // 連續丟失3個心跳包 (斷開連線) ctx.channel().close().sync(); log.error("已與"+ctx.channel().remoteAddress()+"斷開連線"); System.out.println("已與"+ctx.channel().remoteAddress()+"斷開連線"); } else { counter++; log.debug(ctx.channel().remoteAddress() + "丟失了第 " + counter + " 個心跳包"); System.out.println("丟失了第 " + counter + " 個心跳包"); } } } } }
userEventTriggered 該方法中定義瞭如果服務端40s內沒有接收到客戶端發來的訊息,就將丟失次數嘉興累加,如果累加超過3次也就是120s內都沒有接收到客戶端傳來的訊息,服務端將斷開此客戶端的連線。
接下來是客戶端程式碼的實現:
package com.example.nettydemo.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
/**
* @ClassName: NettyClinet
* @Author: huangzf
* @Date: 2018/9/25 15:26
* @Description:
*/
@Slf4j
public class NettyClinet {
@Value("${printer.server.host}")
private String host;
@Value("${printer.server.port}")
private int port;
private static Channel channel;
public NettyClinet(){
}
public NettyClinet(String host, int port) {
this.host = host;
this.port = port;
}
public void start() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.option(ChannelOption.SO_KEEPALIVE,true)
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer(host,port));
ChannelFuture f = b.connect(host,port);
//斷線重連
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
log.error("服務端連結不上,開始重連操作...");
System.err.println("服務端連結不上,開始重連操作...");
start();
}
}, 1L, TimeUnit.SECONDS);
} else {
channel = channelFuture.channel();
log.info("服務端連結成功...");
System.err.println("服務端連結成功...");
}
}
});
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
new NettyClinet ("127.0.0.1",8000).start();
}
}
斷線重連實現也很簡單,就是給通道加上一個斷線重連的監聽器ChannelFutureListene,該監聽器如果監聽到與服務端的連線斷開了就會每隔1s觸發一次重連操作,擔憂一個問題需要注意的是 ChannelFuture f = b.connect(host,port);不能加sync()也就是不能寫成 ChannelFuture f = b.connect(host,port).sync();不然重連操作無法觸發,我也不知道為啥。。。。還有就是不能有任何關閉通道的程式碼,也就是group.shutdownGracefully(); 不然斷線重連無效,因為你已經把該通道關閉了。
ClientChannelInitializer 該類程式碼實現如下:
package com.example.nettydemo.demo;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* @ClassName: ClientChannelInitializer
* @Author: huangzf
* @Date: 2018/9/25 15:29
* @Description:
*/
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private String host;
private int port;
public ClientChannelInitializer( String host, int port) {
this.host = host;
this.port = port;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//解決TCP粘包拆包的問題,以特定的字元結尾($_)
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer("$_".getBytes())));
//字串解碼和編碼
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//心跳檢測
pipeline.addLast(new IdleStateHandler(0,30,0,TimeUnit.SECONDS));
//客戶端的邏輯
pipeline.addLast("handler", new NettyClientHandler(host,port));
}
}
跟服務端一樣的邏輯出操作。。。
客戶端 NettyClientHandler 實現如下:
package com.example.nettydemo.demo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
* @ClassName: NettyClientHandler
* @Author: huangzf
* @Date: 2018/9/25 15:33
* @Description:
*/
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler {
private String host;
private int port;
private NettyClinet nettyClinet;
private String tenantId;
public NettyClientHandler(String host, int port) {
this.host = host;
this.port = port;
nettyClinet = new NettyClinet(host,port);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o)
throws Exception {
System.out.println("Server say : " + o.toString());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道已連線!!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("斷線了。。。。。。");
//使用過程中斷線重連
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(new Runnable() {
@Override
public void run() {
nettyClinet.start();
}
}, 1, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
System.out.println("READER_IDLE");
} else if (event.state().equals(IdleState.WRITER_IDLE)) {
/**傳送心跳,保持長連線*/
String s = "ping$_";
ctx.channel().writeAndFlush(s);
log.debug("心跳傳送成功!");
System.out.println("心跳傳送成功!");
} else if (event.state().equals(IdleState.ALL_IDLE)) {
System.out.println("ALL_IDLE");
}
}
super.userEventTriggered(ctx, evt);
}
}
channelInactive 該方法中也實現了斷線重連的功能,以防止在執行過程中突然斷線。
userEventTriggered:該方法中實現瞭如果30s內客戶端沒有向服務端寫入任何訊息,該方法就會觸發向服務端傳送心跳資訊,從而保持客戶端與服務端的長連線。