Netty 實現心跳機制.md
netty 心跳機制示例,使用netty4,IdleStateHandler 實現。
本文假設你已經了解了Netty的使用,或者至少寫過netty的helloworld,知道了netty的基本使用。我們知道使用netty的時候,大多數的東西都與Handler有關,我們的業務邏輯基本都是在Handler中實現的。Netty中自帶了一個IdleStateHandler 可以用來實現心跳檢測。
心跳檢測的邏輯
本文中我們將要實現的心跳檢測邏輯是這樣的:服務端啟動後,等待客戶端連接,客戶端連接之後,向服務端發送消息。如果客戶端在“幹活”那麽服務端必定會收到數據,如果客戶端“閑下來了”那麽服務端就接收不到這個服務端的消息,既然客戶端閑下來了,不幹事,那麽何必浪費連接資源呢?所以服務端檢測到一定時間內客戶端不活躍的時候,將客戶端連接關閉。本文要實現的邏輯步驟為:
- 啟動服務端,啟動客戶端
- 客戶端向服務端發送"I am alive",並sleep隨機時間,用來模擬空閑。
- 服務端接收客戶端消息,並返回"copy that",客戶端空閑時 計數+1.
- 服務端客戶端繼續通信
- 服務端檢測客戶端空閑太多,關閉連接。客戶端發現連接關閉了,就退出了。
有了這個思路,我們先來編寫服務端。
心跳檢測服務端代碼
public class HeartBeatServer { int port ; public HeartBeatServer(int port){ this.port = port; } public void start(){ ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try{ bootstrap.group(boss,worker) .handler(new LoggingHandler(LogLevel.INFO)) .channel(NioServerSocketChannel.class) .childHandler(new HeartBeatInitializer()); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); }catch(Exception e){ e.printStackTrace(); }finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } public static void main(String[] args) throws Exception { HeartBeatServer server = new HeartBeatServer(8090); server.start(); } }
熟悉netty的同誌,對於上面的模板一樣的代碼一定是在熟悉不過了。啥都不用看,只需要看childHandler(new HeartBeatInitializer())
這一句。HeartBeatInitializer
就是一個ChannelInitializer
顧名思義,他就是在初始化channel的時做一些事情。我們所需要開發的業務邏輯Handler就是在這裏添加的。其代碼如下:
public class HeartBeatInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new IdleStateHandler(2,2,2, TimeUnit.SECONDS)); pipeline.addLast(new HeartBeatHandler()); } }
代碼很簡單,我們先添加了StringDecoder
,和StringEncoder
。這兩個其實就是編解碼用的,下面的IdleStateHandler
才是本次心跳的核心組件。我們可以看到IdleStateHandler
的構造函數中接收了4個參數,起定義如下:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);
三個空閑時間參數,以及時間參數的格式。我們的例子中設置的是2,2,2,意思就是客戶端2秒沒有讀/寫,這個超時時間就會被觸發。超時事件觸發就需要我們來處理了,這就是上的HeartBeatInitializer
中最後一行的HeartBeatHandler
所做的事情。代碼如下:
public class HeartBeatHandler extends SimpleChannelInboundHandler<String> {
int readIdleTimes = 0;
@Override
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(" ====== > [server] message received : " + s);
if("I am alive".equals(s)){
ctx.channel().writeAndFlush("copy that");
}else {
System.out.println(" 其他信息處理 ... ");
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch (event.state()){
case READER_IDLE:
eventType = "讀空閑";
readIdleTimes ++; // 讀空閑的計數加1
break;
case WRITER_IDLE:
eventType = "寫空閑";
// 不處理
break;
case ALL_IDLE:
eventType ="讀寫空閑";
// 不處理
break;
}
System.out.println(ctx.channel().remoteAddress() + "超時事件:" +eventType);
if(readIdleTimes > 3){
System.out.println(" [server]讀空閑超過3次,關閉連接");
ctx.channel().writeAndFlush("you are out");
ctx.channel().close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
}
}
至此,我們的服務端寫好了。
心跳檢測客戶端代碼
netty的api設計使得編碼的模式非常具有通用性,所以客戶端代碼和服務端的代碼幾乎一樣:啟動client端的代碼幾乎一樣,也需要一個ChannelInitializer
,也需要Handler
。改動的地方很少,因此本文不對客戶端代碼進行詳細解釋。下面給出client端的完整代碼:
public class HeartBeatClient {
int port;
Channel channel;
Random random ;
public HeartBeatClient(int port){
this.port = port;
random = new Random();
}
public static void main(String[] args) throws Exception{
HeartBeatClient client = new HeartBeatClient(8090);
client.start();
}
public void start() {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new HeartBeatClientInitializer());
connect(bootstrap,port);
String text = "I am alive";
while (channel.isActive()){
sendMsg(text);
}
}catch(Exception e){
// do something
}finally {
eventLoopGroup.shutdownGracefully();
}
}
public void connect(Bootstrap bootstrap,int port) throws Exception{
channel = bootstrap.connect("localhost",8090).sync().channel();
}
public void sendMsg(String text) throws Exception{
int num = random.nextInt(10);
Thread.sleep(num * 1000);
channel.writeAndFlush(text);
}
static class HeartBeatClientInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
}
static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(" client received :" +msg);
if(msg!= null && msg.equals("you are out")) {
System.out.println(" server closed connection , so client will close too");
ctx.channel().closeFuture();
}
}
}
}
運行代碼
在上面的代碼寫好之後,我們先啟動服務端,然後在啟動客戶端。運行日誌如下:
server端:
=== /127.0.0.1:57700 is active ===
====== > [server] message received : I am alive
====== > [server] message received : I am alive
/127.0.0.1:57700超時事件:寫空閑
/127.0.0.1:57700超時事件:讀空閑
/127.0.0.1:57700超時事件:讀寫空閑
/127.0.0.1:57700超時事件:寫空閑
/127.0.0.1:57700超時事件:讀空閑
/127.0.0.1:57700超時事件:讀寫空閑
/127.0.0.1:57700超時事件:寫空閑
====== > [server] message received : I am alive
/127.0.0.1:57700超時事件:寫空閑
/127.0.0.1:57700超時事件:讀寫空閑
/127.0.0.1:57700超時事件:讀空閑
/127.0.0.1:57700超時事件:寫空閑
/127.0.0.1:57700超時事件:讀寫空閑
/127.0.0.1:57700超時事件:讀空閑
[server]讀空閑超過3次,關閉連接
client端:
client sent msg and sleep 2
client received :copy that
client received :copy that
client sent msg and sleep 6
client sent msg and sleep 6
client received :copy that
client received :you are out
server closed connection , so client will close too
Process finished with exit code 0
通過上面的運行日誌,我們可以看到:
1.客戶端在與服務器成功建立之後,發送了3次‘I am alive‘,服務端也回應了3次:‘copy that‘
2.由於客戶端消極怠工,超時了多次,服務端關閉了鏈接。
3.客戶端知道服務端拋棄自己之後,也關閉了連接,程序退出。
以上簡單了演示了一下,netty的心跳機制,其實主要就是使用了IdleStateHandler
。源碼下載:https://gitee.com/dimixu/netty_learn.git
Netty 實現心跳機制.md