一起學Netty(十二)之 Netty心跳簡單Demo
阿新 • • 發佈:2019-01-03
前面簡單地瞭解了一下IdleStateHandler,我們現在寫一個簡單的心跳demo:
1)伺服器端每隔5秒檢測伺服器端的讀超時,如果5秒沒有接受到客戶端的寫請求,也就說伺服器端5秒沒有收到讀事件,則視為一次超時
2)如果超時二次則說明連線處於不活躍的狀態,關閉ServerChannel
3)客戶端每隔4秒傳送一些寫請求,這個請求相當於一次心跳包,告之伺服器端:客戶端仍舊活著
我們開始先開始寫伺服器端的handler,繼承ChannelInboundHandlerAdapter,我們先重寫userEventTriggered方法,這個方法我們前面講過,如果超時則會觸發相應的超時事件
HeartBeatServerHandler.java
package com.lyncc.netty.heartbeats; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { private int loss_connect_time = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { loss_connect_time++; System.out.println("5 秒沒有接收到客戶端的資訊了"); if (loss_connect_time > 2) { System.out.println("關閉這個不活躍的channel"); ctx.channel().close(); } } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead.."); System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
再寫一下伺服器端,我們要注意的是,我們要在channelPipeline中加入IdleStateHandler,我們在handler中提示的是5秒讀,所以我們配置的是:
這樣就可以每隔5秒檢測一下服務端的讀超時。完整程式碼清單如下:
HeartBeatClientHandler.java方法也重寫userEventTriggered方法,因為客戶端沒有任何寫的情況,所以我們可以每次都能進行寫超時:package com.lyncc.netty.heartbeats; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; public class HeartBeatServer { private int port; public HeartBeatServer(int port) { this.port = port; } public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); ch.pipeline().addLast("decoder", new StringDecoder()); ch.pipeline().addLast("encoder", new StringEncoder()); ch.pipeline().addLast(new HeartBeatServerHandler()); }; }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); // 繫結埠,開始接收進來的連線 ChannelFuture future = sbs.bind(port).sync(); System.out.println("Server start listen at " + port ); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new HeartBeatServer(port).start(); } }
也就說這個方法每隔4秒都能觸發:
紅色邊框程式碼在客戶端沒有寫事件的時候,一超時就會觸發寫請求:
完整程式碼如下:
HeartBeatClientHandler.java
package com.lyncc.netty.heartbeats;
import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
private static final int TRY_TIMES = 3;
private int currentTime = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("啟用時間是:"+new Date());
System.out.println("HeartBeatClientHandler channelActive");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止時間是:"+new Date());
System.out.println("HeartBeatClientHandler channelInactive");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("迴圈觸發時間:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
}
HeartBeatsClient.java
客戶端程式碼也要加入IdleStateHandler這個handler,注意的是,我們要注意的是寫超時,所以要設定寫超時的時間,因為伺服器端是5秒檢測讀超時,所以客戶端必須在5秒內傳送一次心跳,告之服務端,所以我們設定4秒:
完整程式碼如下:
package com.lyncc.netty.heartbeats;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
public class HeartBeatsClient {
public void connect(int port, String host) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("ping", new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
p.addLast("decoder", new StringDecoder());
p.addLast("encoder", new StringEncoder());
p.addLast(new HeartBeatClientHandler());
}
});
ChannelFuture future = b.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 採用預設值
}
}
new HeartBeatsClient().connect(port, "127.0.0.1");
}
}
我們先啟動伺服器端:
再啟動客戶端:
此時客戶端還存活著,我們看看伺服器端的輸出:
我們再看看客戶端的輸出:
inactive的事件觸發了,且客戶端自動停止了~
簡單的心跳Demo就是這樣,如有不對,還希望大家多多拍磚~