1. 程式人生 > >淺析 Netty心跳機制

淺析 Netty心跳機制

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

例項化一個 IdleStateHandler 需要提供三個引數:

  • readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到資料時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有資料寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.

  • allIdleTimeSeconds, 讀/寫超時. 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.

為了展示具體的 IdleStateHandler 實現的心跳機制, 下面我們來構造一個具體的EchoServer 的例子, 這個例子的行為如下:

  1. 在這個例子中, 客戶端和伺服器通過 TCP 長連線進行通訊.

  2. TCP 通訊的報文格式是:

+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+
  1. 客戶端每隔一個隨機的時間後, 向伺服器傳送訊息, 伺服器收到訊息後, 立即將收到的訊息原封不動地回覆給客戶端.

  2. 若客戶端在指定的時間間隔內沒有讀/寫操作, 則客戶端會自動向伺服器傳送一個 PING 心跳, 伺服器收到 PING 心跳訊息時, 需要回復一個 PONG 訊息.

通用部分

根據上面定義的行為, 我們接下來實現心跳的通用部分 CustomHeartbeatHandler

:

public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
    public static final byte PING_MSG = 1;
    public static final byte PONG_MSG = 2;
    public static final byte CUSTOM_MSG = 3;
    protected String name;
    private int heartbeatCount = 0;

    public CustomHeartbeatHandler(String name) {
        this.name = name;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
        if (byteBuf.getByte(4) == PING_MSG) {
            sendPongMsg(context);
        } else if (byteBuf.getByte(4) == PONG_MSG){
            System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
        } else {
            handleData(context, byteBuf);
        }
    }

    protected void sendPingMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PING_MSG);
        context.writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    private void sendPongMsg(ChannelHandlerContext context) {
        ByteBuf buf = context.alloc().buffer(5);
        buf.writeInt(5);
        buf.writeByte(PONG_MSG);
        context.channel().writeAndFlush(buf);
        heartbeatCount++;
        System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
    }

    protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // IdleStateHandler 所產生的 IdleStateEvent 的處理邏輯.
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
    }

    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        System.err.println("---READER_IDLE---");
    }

    protected void handleWriterIdle(ChannelHandlerContext ctx) {
        System.err.println("---WRITER_IDLE---");
    }

    protected void handleAllIdle(ChannelHandlerContext ctx) {
        System.err.println("---ALL_IDLE---");
    }
}

類 CustomHeartbeatHandler 負責心跳的傳送和接收, 我們接下來詳細地分析一下它的作用. 我們在前面提到, IdleStateHandler 是實現心跳的關鍵, 它會根據不同的 IO idle 型別來產生不同的 IdleStateEvent 事件, 而這個事件的捕獲, 其實就是在 userEventTriggered 方法中實現的.
我們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實現:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent e = (IdleStateEvent) evt;
        switch (e.state()) {
            case READER_IDLE:
                handleReaderIdle(ctx);
                break;
            case WRITER_IDLE:
                handleWriterIdle(ctx);
                break;
            case ALL_IDLE:
                handleAllIdle(ctx);
                break;
            default:
                break;
        }
    }
}

在 userEventTriggered 中, 根據 IdleStateEvent 的 state() 的不同, 而進行不同的處理. 例如如果是讀取資料 idle, 則 e.state() == READER_IDLE, 因此

就呼叫 handleReaderIdle 來處理它. CustomHeartbeatHandler 提供了三個 idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle,

這三個方法目前只有預設的實現, 它需要在子類中進行重寫, 現在我們暫時略過它們, 在具體的客戶端和伺服器的實現部分時再來看它們.

知道了這一點後, 我們接下來看看資料處理部分:
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
    if (byteBuf.getByte(4) == PING_MSG) {
        sendPongMsg(context);
    } else if (byteBuf.getByte(4) == PONG_MSG){
        System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
    } else {
        handleData(context, byteBuf);
    }
}
在 CustomHeartbeatHandler.channelRead0 中, 我們首先根據報文協議:
+--------+-----+---------------+ 
| Length |Type |   Content     |
|   17   |  1  |"HELLO, WORLD" |
+--------+-----+---------------+

來判斷當前的報文型別, 如果是 PING_MSG 則表示是伺服器收到客戶端的 PING 訊息, 此時伺服器需要回復一個 PONG 訊息, 其訊息型別是 PONG_MSG.
扔報文型別是 PONG_MSG, 則表示是客戶端收到伺服器傳送的 PONG 訊息, 此時列印一個 log 即可.

客戶端部分

客戶端初始化:
public class Client {
    public static void main(String[] args) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        Random random = new Random(System.currentTimeMillis());
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(workGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new IdleStateHandler(0, 0, 5));
                            p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
                            p.addLast(new ClientHandler());
                        }
                    });

            Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
            for (int i = 0; i < 10; i++) {
                String content = "client msg " + i;
                ByteBuf buf = ch.alloc().buffer();
                buf.writeInt(5 + content.getBytes().length);
                buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
                buf.writeBytes(content.getBytes());
                ch.writeAndFlush(buf);

                Thread.sleep(random.nextInt(20000));
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}
上面的程式碼是 Netty 的客戶端端的初始化程式碼, 使用過 Netty 的朋友對這個程式碼應該不會陌生. 別的部分我們就不再贅述, 我們來看看

ChannelInitializer.initChannel 部分即可:

.handler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline p = socketChannel.pipeline();
        p.addLast(new IdleStateHandler(0, 0, 5));
        p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
        p.addLast(new ClientHandler());
    }
});

我們給 pipeline 添加了三個 Handler, IdleStateHandler 這個 handler 是心跳機制的核心, 我們為客戶端端設定了讀寫 idle 超時, 時間間隔是5s, 即如果客

戶端在間隔 5s 後都沒有收到伺服器的訊息或向伺服器傳送訊息, 則產生 ALL_IDLE 事件.

接下來我們添加了 LengthFieldBasedFrameDecoder, 它是負責解析我們的 TCP 報文, 因為和本文的目的無關, 因此這裡不詳細展開.
最後一個 Handler 是 ClientHandler, 它繼承於 CustomHeartbeatHandler, 是我們處理業務邏輯部分.

客戶端

public class ClientHandler extends CustomHeartbeatHandler {
    public ClientHandler() {
        super("client");
    }

    @Override
    protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        byte[] data = new byte[byteBuf.readableBytes() - 5];
        byteBuf.skipBytes(5);
        byteBuf.readBytes(data);
        String content = new String(data);
        System.out.println(name + " get content: " + content);
    }

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        super.handleAllIdle(ctx);
        sendPingMsg(ctx);
    }
}
ClientHandler 繼承於 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裡面實現 僅僅列印收到的訊息.
第二個重寫的方法是 handleAllIdle. 我們在前面提到, 客戶端負責傳送心跳的 PING 訊息, 當客戶端產生一個 ALL_IDLE 事件後, 會導致父類的
CustomHeartbeatHandler.userEventTriggered 呼叫, 而 userEventTriggered 中會根據 e.state() 來呼叫不同的方法, 因此最後呼叫的是  ClientHandler.handleAllIdle, 在這個方法中, 客戶端呼叫 sendPingMsg 向伺服器傳送一個 PING 訊息.

伺服器部分

初始化伺服器