Netty心跳機制-長連線
前文需求回顧
完成對紅酒窖的室內溫度採集及監控功能。由本地應用程式+溫度感測器定時採集室內溫度上報至伺服器,如果溫度 >20 °C 則由伺服器下發重啟空調指令,如果本地應用長時間不上傳溫度給伺服器,則給戶主手機發送一條預警簡訊。
Netty入門篇-從雙向通訊開始「上文」
上篇算是完成簡單的雙向通訊了,我們接著看看 “如果本地應用長時間不上傳溫度給伺服器...”,很明顯客戶端有可能掛了嘛,所以怎麼實現客戶端與服務端的長連線就是本文要實現的了。
什麼是心跳機制
百度百科:心跳機制是定時傳送一個自定義的結構體(心跳包),讓對方知道自己還活著,以確保連線的有效性的機制。
簡單說,這個心跳機制是由客戶端主動發起的訊息,每隔一段時間就向服務端傳送訊息,告訴服務端自己還沒死,可不要給戶主傳送預警簡訊啊。
如何實現心跳機制
1、客戶端程式碼修改
我們需要改造一下上節中客戶端的程式碼,首先是在責任鏈中增加一個心跳邏輯處理類HeartbeatHandler
public class NettyClient {
private static String host = "127.0.0.1";
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
// 1.指定執行緒模型
.group(workerGroup)
// 2.指定 IO 型別為 NIO
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true )
.option(ChannelOption.TCP_NODELAY, true)
// 3.IO 處理邏輯
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, 10, 0))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new HeartbeatHandler())
.addLast(new NettyClientHandler());
}
});
// 4.建立連線
bootstrap.connect(host, 8070).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連線成功!");
} else {
System.err.println("連線失敗!");
}
});
}
}
沒什麼變化,主要是增加了HeartbeatHandler
,我們來看看這個類:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.nio.charset.Charset;
import java.time.LocalTime;
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
System.out.println("10秒了,需要傳送訊息給服務端了" + LocalTime.now());
//向服務端送心跳包
ByteBuf buffer = getByteBuf(ctx);
//傳送心跳訊息,並在傳送失敗時關閉該連線
ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("捕獲的異常:" + cause.getMessage());
ctx.channel().close();
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 獲取二進位制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
String time = "heartbeat:客戶端心跳資料:" + LocalTime.now();
// 2. 準備資料,指定字串的字符集為 utf-8
byte[] bytes = time.getBytes(Charset.forName("utf-8"));
// 3. 填充資料到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
還是繼承自ChannelInboundHandlerAdapter
,不過這次重寫的是userEventTriggered()
方法,這個方法在客戶端的所有ChannelHandler
中,如果10s內沒有發生write事件時觸發,所以我們在該方法中給服務端傳送心跳訊息。
業務邏輯處理類NettyClientHandler
沒有改動,程式碼如下:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客戶端寫出資料");
// 1. 獲取資料
ByteBuf buffer = getByteBuf(ctx);
// 2. 寫資料
ctx.channel().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 獲取二進位制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
Random random = new Random();
double value = random.nextDouble() * 14 + 8;
String temp = "獲取室內溫度:" + value;
// 2. 準備資料,指定字串的字符集為 utf-8
byte[] bytes = temp.getBytes(Charset.forName("utf-8"));
// 3. 填充資料到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(new Date() + ": 客戶端讀到資料 -> " + msg.toString());
}
}
對如上程式碼不瞭解的可以回看上一節:Netty入門篇-從雙向通訊開始
2、服務端程式碼修改
服務端程式碼主要是開啟TCP底層心跳機制支援,.childOption(ChannelOption.SO_KEEPALIVE, true)
,其他的程式碼並沒有改動:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
// 指定Channel
.channel(NioServerSocketChannel.class)
//服務端可連線佇列數,對應TCP/IP協議listen函式中backlog引數
.option(ChannelOption.SO_BACKLOG, 1024)
//設定TCP長連線,一般如果兩個小時內沒有資料的通訊時,TCP會自動傳送一個活動探測資料報文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//將小的資料包包裝成更大的幀進行傳送,提高網路的負載
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new NettyServerHandler());
}
});
serverBootstrap.bind(8070);
}
}
我們再來看看服務端的業務處理類 NettyServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
String message = byteBuf.toString(Charset.forName("utf-8"));
System.out.println(new Date() + ": 服務端讀到資料 -> " + message);
/** 心跳資料是不傳送資料 **/
if(!message.contains("heartbeat")){
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "我是傳送給客戶端的資料:請重啟冰箱!".getBytes(Charset.forName("utf-8"));
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}
對channelRead()
方法增加了一個 if 判斷,判斷如果包含heartbeat
字串就認為這是客戶端發過來的心跳,這種判斷是非常low的,因為到目前為止我們一直是用簡單字串來傳遞資料的,上邊傳遞的資料就直接操作字串;那麼問題來了,如果我們想傳遞物件怎麼搞呢?下節寫。我們先來看一下如上程式碼客戶端與服務端執行截圖:
服務端
客戶端
至此,整個心跳機制就完成了,這樣每隔10秒客戶端就會給服務端傳送一個心跳訊息,下節我們通過了解通協議以完善心跳機制的程式碼。
我建立了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小夥伴可以關注一下:小偉後端筆記