1. 程式人生 > 實用技巧 >Netty心跳機制-長連線

Netty心跳機制-長連線

前文需求回顧

完成對紅酒窖的室內溫度採集及監控功能。由本地應用程式+溫度感測器定時採集室內溫度上報至伺服器,如果溫度 >20 °C 則由伺服器下發重啟空調指令,如果本地應用長時間不上傳溫度給伺服器,則給戶主手機發送一條預警簡訊。


Netty入門篇-從雙向通訊開始「上文」

上篇算是完成簡單的雙向通訊了,我們接著看看 “如果本地應用長時間不上傳溫度給伺服器...”,很明顯客戶端有可能掛了嘛,所以怎麼實現客戶端與服務端的長連線就是本文要實現的了。

什麼是心跳機制

百度百科:心跳機制是定時傳送一個自定義的結構體(心跳包),讓對方知道自己還活著,以確保連線的有效性的機制。

簡單說,這個心跳機制是由客戶端主動發起的訊息,每隔一段時間就向服務端傳送訊息,告訴服務端自己還沒死,可不要給戶主傳送預警簡訊啊。

如何實現心跳機制

1、客戶端程式碼修改

我們需要改造一下上節中客戶端的程式碼,首先是在責任鏈中增加一個心跳邏輯處理類HeartbeatHandler

public class NettyClient {

    private static String host = "127.0.0.1";

    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                // 1.指定執行緒模型
                .group(workerGroup)
                // 2.指定 IO 型別為 NIO
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true
) .option(ChannelOption.TCP_NODELAY, true) // 3.IO 處理邏輯 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new IdleStateHandler(0, 10, 0)) .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new HeartbeatHandler()) .addLast(new NettyClientHandler()); } }); // 4.建立連線 bootstrap.connect(host, 8070).addListener(future -> { if
(future.isSuccess()) { System.out.println("連線成功!"); } else { System.err.println("連線失敗!"); } }); } }

沒什麼變化,主要是增加了HeartbeatHandler,我們來看看這個類:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.nio.charset.Charset;
import java.time.LocalTime;

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                System.out.println("10秒了,需要傳送訊息給服務端了" + LocalTime.now());
                //向服務端送心跳包
                ByteBuf buffer = getByteBuf(ctx);
                //傳送心跳訊息,並在傳送失敗時關閉該連線
                ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("捕獲的異常:" + cause.getMessage());
        ctx.channel().close();
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 獲取二進位制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        String time = "heartbeat:客戶端心跳資料:" + LocalTime.now();
        // 2. 準備資料,指定字串的字符集為 utf-8
        byte[] bytes = time.getBytes(Charset.forName("utf-8"));
        // 3. 填充資料到 ByteBuf
        buffer.writeBytes(bytes);
        return buffer;
    }

}

還是繼承自ChannelInboundHandlerAdapter,不過這次重寫的是userEventTriggered()方法,這個方法在客戶端的所有ChannelHandler中,如果10s內沒有發生write事件時觸發,所以我們在該方法中給服務端傳送心跳訊息。

業務邏輯處理類NettyClientHandler沒有改動,程式碼如下:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ": 客戶端寫出資料");

        // 1. 獲取資料
        ByteBuf buffer = getByteBuf(ctx);

        // 2. 寫資料
        ctx.channel().writeAndFlush(buffer);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 獲取二進位制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        Random random = new Random();
        double value = random.nextDouble() * 14 + 8;
        String temp = "獲取室內溫度:" + value;

        // 2. 準備資料,指定字串的字符集為 utf-8
        byte[] bytes = temp.getBytes(Charset.forName("utf-8"));

        // 3. 填充資料到 ByteBuf
        buffer.writeBytes(bytes);

        return buffer;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(new Date() + ": 客戶端讀到資料 -> " + msg.toString());
    }

}

對如上程式碼不瞭解的可以回看上一節:Netty入門篇-從雙向通訊開始

2、服務端程式碼修改

服務端程式碼主要是開啟TCP底層心跳機制支援,.childOption(ChannelOption.SO_KEEPALIVE, true) ,其他的程式碼並沒有改動:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {

    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)

                // 指定Channel
                .channel(NioServerSocketChannel.class)

                //服務端可連線佇列數,對應TCP/IP協議listen函式中backlog引數
                .option(ChannelOption.SO_BACKLOG, 1024)

                //設定TCP長連線,一般如果兩個小時內沒有資料的通訊時,TCP會自動傳送一個活動探測資料報文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //將小的資料包包裝成更大的幀進行傳送,提高網路的負載
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });

        serverBootstrap.bind(8070);
    }

}

我們再來看看服務端的業務處理類 NettyServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        String message = byteBuf.toString(Charset.forName("utf-8"));
        System.out.println(new Date() + ": 服務端讀到資料 -> " + message);
        /** 心跳資料是不傳送資料 **/
        if(!message.contains("heartbeat")){
            ByteBuf out = getByteBuf(ctx);
            ctx.channel().writeAndFlush(out);
        }
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "我是傳送給客戶端的資料:請重啟冰箱!".getBytes(Charset.forName("utf-8"));
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);
        return buffer;
    }

}

channelRead() 方法增加了一個 if 判斷,判斷如果包含heartbeat字串就認為這是客戶端發過來的心跳,這種判斷是非常low的,因為到目前為止我們一直是用簡單字串來傳遞資料的,上邊傳遞的資料就直接操作字串;那麼問題來了,如果我們想傳遞物件怎麼搞呢?下節寫。我們先來看一下如上程式碼客戶端與服務端執行截圖:

服務端

客戶端

至此,整個心跳機制就完成了,這樣每隔10秒客戶端就會給服務端傳送一個心跳訊息,下節我們通過了解通協議以完善心跳機制的程式碼。

我建立了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小夥伴可以關注一下:小偉後端筆記