1. 程式人生 > >Netty學習(八)-Netty的心跳機制

Netty學習(八)-Netty的心跳機制

1. 如何實現心跳機制
一般實現心跳機制由兩種方式:

TCP協議自帶的心跳機制來實現;
在應用層來實現。
但是TCP協議自帶的心跳機制系統預設是設定的是2小時的心跳頻率。它檢查不到機器斷電、網線拔出、防火牆這些斷線。而且邏輯層處理斷線可能也不是那麼好處理。另外該心跳機制是與TCP協議繫結的,那如果我們要是使用UDP協議豈不是用不了?所以一般我們都不用。

而一般我們自己實現呢大致的策略是這樣的:

Client啟動一個定時器,不斷髮送心跳;
Server收到心跳後,做出迴應;
Server啟動一個定時器,判斷Client是否存在,這裡做判斷有兩種方法:時間差和簡單標識。
時間差:

收到一個心跳包之後記錄當前時間;
判斷定時器到達時間,計算多久沒收到心跳時間=當前時間-上次收到心跳時間。如果改時間大於設定值則認為超時。
簡單標識:

收到心跳後設置連線標識為true;
判斷定時器到達時間,如果未收到心跳則設定連線標識為false;
今天我們來看一下Netty的心跳機制的實現,在Netty中提供了IdleStateHandler類來進行心跳的處理,它可以對一個 Channel 的 讀/寫設定定時器, 當 Channel 在一定事件間隔內沒有資料互動時(即處於 idle 狀態), 就會觸發指定的事件。

該類可以對三種類型的超時做心跳機制檢測:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
1
2
3
readerIdleTimeSeconds:設定讀超時時間;
writerIdleTimeSeconds:設定寫超時時間;
allIdleTimeSeconds:同時為讀或寫設定超時時間;
下面我們還是通過一個例子來講解IdleStateHandler的使用。

服務端:

public class HeartBeatServer {
private int port;

public HeartBeatServer(int port) {
this.port = port;
}

public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerChannelInitializer());

try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
HeartBeatServer server = new HeartBeatServer(7788);
server.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
服務端Initializer:

public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatServerHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
在這裡IdleStateHandler也是handler的一種,所以加入addLast。我們分別設定4個引數:讀超時時間為3s,寫超時和讀寫超時為0,然後加入時間控制單元。

服務端handler:

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
private int loss_connect_time = 0;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
//服務端對應著讀事件,當為READER_IDLE時觸發
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.READER_IDLE){
loss_connect_time++;
System.out.println("接收訊息超時");
if(loss_connect_time > 2){
System.out.println("關閉不活動的連結");
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
我們看到在handler中呼叫了userEventTriggered方法,IdleStateEvent的state()方法一個有三個值:
READER_IDLE,WRITER_IDLE,ALL_IDLE。正好對應讀事件寫事件和讀寫事件。

再來寫一下客戶端:

public class HeartBeatsClient {
private int port;
private String address;

public HeartBeatsClient(int port, String address) {
this.port = port;
this.address = address;
}

public void start(){
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());

try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}

}

public static void main(String[] args) {
HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
client.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
客戶端Initializer:

public class HeartBeatsClientChannelInitializer extends ChannelInitializer<SocketChannel> {

protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
這裡我們設定了IdleStateHandler的寫超時為3秒,客戶端執行的動作為寫訊息到服務端,服務端執行讀動作。

客戶端handler:

public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));

private static final int TRY_TIMES = 3;

private int currentTime = 0;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("啟用時間是:"+new Date());
System.out.println("連結已經啟用");
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止時間是:"+new Date());
System.out.println("關閉連結");
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("當前輪詢時間:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
啟動服務端和客戶端我們看到輸出為:

我們再來屢一下思路:

首先客戶端啟用channel,因為客戶端中並沒有傳送訊息所以會觸發客戶端的IdleStateHandler,它設定的寫超時時間為3s;
然後觸發客戶端的事件機制進入userEventTriggered方法,在觸發器中計數並向客戶端傳送訊息;
服務端接收訊息;
客戶端觸發器繼續輪詢傳送訊息,直到計數器滿不再向服務端傳送訊息;
服務端在IdleStateHandler設定的讀訊息超時時間5s內未收到訊息,觸發了服務端中handler的userEventTriggered方法,於是關閉客戶端的連結。
大體我們的簡單心跳機制就是這樣的思路,通過事件觸發機制以及計數器的方式來實現,上面我們的案例中最後客戶端沒有傳送訊息的時候我們是強制斷開了客戶端的連結,那麼既然可以關閉,我們是不是也可是重新連結客戶端呢?因為萬一客戶端本身並不想關閉而是由於別的原因導致他無法與服務端通訊。下面我們來說一下重連機制。

當我們的服務端在未讀到客戶端訊息超時而關閉客戶端的時候我們一般在客戶端的finally塊中方的是關閉客戶端的程式碼,這時我們可以做一下修改的,finally是一定會被執行新的,所以我們可以在finally塊中重新呼叫一下啟動客戶端的程式碼,這樣就又重新啟動了客戶端了,上客戶端程式碼:

/**
* 本Client為測試netty重連機制
* Server端程式碼都一樣,所以不做修改
* 只用在client端中做一下判斷即可
*/
public class HeartBeatsClient2 {

private int port;
private String address;
ChannelFuture future;

public HeartBeatsClient2(int port, String address) {
this.port = port;
this.address = address;
}

public void start(){
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());

try {
future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
//group.shutdownGracefully();
if (null != future) {
if (future.channel() != null && future.channel().isOpen()) {
future.channel().close();
}
}
System.out.println("準備重連");
start();
System.out.println("重連成功");
}

}

public static void main(String[] args) {
HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");
client.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
其餘部分的程式碼與上面的例項並無異同,只需改造客戶端即可,我們再執行服務端和客戶端會看到客戶端雖然被關閉了,但是立馬又被重啟:

當然生產級別的程式碼應該不是這樣實現的吧,哈哈,下一節我們再好好探討。
---------------------
作者:rickiyang
來源:CSDN
原文:https://blog.csdn.net/a953713428/article/details/69378412
版權宣告:本文為博主原創文章,轉載請附上博文連結!

相關推薦

Netty學習()-Netty心跳機制

1. 如何實現心跳機制一般實現心跳機制由兩種方式: TCP協議自帶的心跳機制來實現;在應用層來實現。但是TCP協議自帶的心跳機制系統預設是設定的是2小時的心跳頻率。它檢查不到機器斷電、網線拔出、防火牆這些斷線。而且邏輯層處理斷線可能也不是那麼好處理。另外該心跳機制是與TCP協議繫結的,那如果我們要是使用

Java Netty 學習() - Netty的Channel

在普通IO中,通過把機器傳輸抽象成java.net.Socket流,也就是Socket和ServerSocket 到NIO時,則變為了java.nio.channels.Channel,也是作為傳輸的管道,具體可看Java Netty 學習(四) - NIO基礎知識Channel和Pip

Java Netty 學習()

在普通IO中,通過把機器傳輸抽象成java.net.Socket流,也就是Socket和ServerSocket 到NIO時,則變為了java.nio.channels.Channel,也是作為傳輸的管道,具體可看Java Netty 學習(四) - NIO基礎

Netty學習(九)-Netty編解碼技術之Marshalling

前面我們講過protobuf的使用,主流的編解碼框架其實還有很多種: ①JBoss的Marshalling包 ②google的Protobuf ③基於Protobuf的Kyro ④Apache的Thrift JBoss Marshalling是一個J

Netty學習——用Netty實現一個簡單的Http伺服器

package study.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.C

NettyNetty生產級的心跳和重連機制

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Netty 實現心跳機制.md

ann 構造 red 通過 case 程序 evel exc 基本使用 netty 心跳機制示例,使用netty4,IdleStateHandler 實現。 本文假設你已經了解了Netty的使用,或者至少寫過netty的helloworld,知道了netty的基本使用。我們

netty 實現長連線,心跳機制,以及重連

實現的功能 心跳機制 and 長連線 and 重連機制 and 應用訊息傳輸: 這裡最關鍵的就是通過netty中的 IdleStateHandler 超時機制來實現心跳和重連 然後通過org.msgpack編碼器來實現跨平臺資料傳輸, 在這實現的功能就是通過Scanner來輸

Netty學習之路()-Google Protobuf編碼

Protobuf是一個靈活,高效,結構化的資料序列化框架,相比於XML等傳統的序列化工具,它更小,更快,更簡單。Protobuf支援資料結構化一次可以到處使用,甚至可以跨語言使用,通過程式碼生成工具可以自動生成不同語言版本的原始碼,甚至可以在使用不同版本的資料結構程序間進行資料傳遞,實現資料

NETTY 心跳機制

end value trap parseint acceptor blog channel 出發 每次 最近工作比較忙,但閑暇之余還是看了阿裏的馮家春(fengjiachun)的github上的開源代碼Jupiter,寫的RPC框架讓我感嘆人外有人,廢話不多說,下面的代碼全

Netty(一) SpringBoot 整合長連線心跳機制

前言 Netty 是一個高效能的 NIO 網路框架,本文基於 SpringBoot 以常見的心跳機制來認識 Netty。 最終能達到的效果: 客戶端每隔 N 秒檢測是否需要傳送心跳。 服務端也每隔 N 秒檢測是否需要傳送心跳。 服務端可以主動 push 訊息到客戶端。 基於 SpringBo

一起學Netty(十netty原始碼學習netty server端原始碼初讀(上)

server端是使用了Reactor模式對nio進行了一些封裝,Reactor模式網上有很多資料,不贅述,瞭解了這個模式開始看原始碼 netty的版本是4.0.21.Final <dependency> <groupId>io.netty<

Netty學習之旅------原始碼分析Netty執行緒本地分配機制與PooledByteBuf執行緒級物件池原理分析

final PoolArena<byte[]> heapArena; //使用輪叫輪詢機制,每個執行緒從heapArena[]中獲取一個,用於記憶體分配。 final PoolArena<ByteBuffer> directArena;

一起學Netty(十四)之 Netty生產級的心跳和重連機制

sigh,寫這篇部落格的時候老臉還是紅了一下,心裡還是有些唏噓的,應該算是剽竊吧,每個人的程式碼功力的確是有差距的,好在文章的標題是“一起學”,而不是開濤大神的“跟我學”系列的文章,我們還是多花點時間學習吧,感嘆無用~ 最近工作比較忙,但閒暇之餘還是看了阿里的馮家春(fe

netty學習筆記(一)—結合reactor模式探索netty對網路io的處理機制

Reactor與Proactor簡介 reactor、proactor常見的翻譯是反應器(堆)、前攝器,這名字聽著總讓人一頭霧水的,抓不著本質。後來看看對應形容詞的英文釋義,再結合技術角度的描述,總算有了基礎的認識: reactive: reacting to event

netty自定義心跳機制

   在我的上一篇文章中已經介紹過了rts遊戲的基本架構,下面來看一下心跳協議的使用    在netty中重寫ChannelInboundHandlerAdapter的userEventTriggered方法可以實現心跳協議的檢測,寫起來也比較簡單,網上的demo很多,但是

淺析 Netty心跳機制

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerI

聊聊心跳機制netty心跳實現

  我們在使用netty的時候會使用一個引數,ChannelOption.SO_KEEPALIVE為true, 設定好了之後再Linux系統才會對keepalive生效,但是linux裡邊需要配置幾個引數,tcp_keepalive_time, tcp_keepalive_invl, tcp_keepaliv

netty學習(一)--linux下的網絡io模型簡單介紹

網絡協議 過程 content 結構體 了解 簡單 文件路徑 技術 連接 linux的內核將全部的外部設備都看作一個文件來操作,對一個文件的讀寫操作會調用內核提供的系統命令 ,返回一個file descriptor(fd。文件描寫敘述符)。而

netty學習之Reactor線程模型以及在netty中的應用

rec 直接 滿足 red 轉載 chan tail io處理 理論 轉載:http://blog.csdn.net/u010853261/article/details/55805216 說道netty的線程模型,我們第一反應就是經典的Reactor線程模型,下面我們就