1. 程式人生 > 實用技巧 >netty心跳檢測機制

netty心跳檢測機制

既然是網路通訊那麼心跳檢測肯定是離不開的,netty心跳檢測分為讀、寫、全域性

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // 註冊一個心跳檢測機制, 3秒未發生讀事件進行觸發,4秒為發生寫事件進行觸發,7秒未發生讀寫事件進行觸發
        ch.pipeline().addLast(new IdleStateHandler(3, 4, 7, TimeUnit.SECONDS));
        
// 發生心跳檢測隨即交個下一個handler.userEventTriggered(ctx, event)來處理 ch.pipeline().addLast(new HBServerChannelHandler()); } }); /********************************IdleStateHandler********************************/ public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
public void channelActive(ChannelHandlerContext ctx) throws Exception { // channel處於active狀態隨即進行初始化 initialize(ctx); super.channelActive(ctx); } private void initialize(ChannelHandlerContext ctx) { switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx);
// 根據設定的相關時間 建立不同的定時任務並提交到EventLoop lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } } /********************************ReaderIdleTimeoutTask********************************/ private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super(ctx); } @Override protected void run(ChannelHandlerContext ctx) { // 計算出下次心跳檢測剩餘的時間,因為定時任務可能被提前執行 // 在之前分析EventLoop.run()的runAllTasks(),會把定時任務佇列中未超過執行期限的定時任務新增到當前普通任務中來 long nextDelay = readerIdleTimeNanos; if (!reading) { // ticksInNanos() - lastReadTime = 當前離上次讀事件發生的間隔 nextDelay -= ticksInNanos() - lastReadTime; } if (nextDelay <= 0) { // 小於0 表示已經超過設定的心跳檢測時長 // 心跳檢測是不斷重複檢測,重新發起一個心跳檢測任務 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { // 將當前心跳檢測事件向後傳遞,隨即交給下一個handler執行,handler.fireUserEventTriggered(context, event) IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // 還沒有超出心跳檢測的時間範圍,則將剩餘的時間當成下一次定時任務執行的時間 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } }

可以看出netty的心跳檢測就是通過IdleStateHandler完成的,解釋下IdleStateHandler構造方法的幾個引數

  • readerIdleTime: 讀事件的心跳檢測時長

  • writerIdleTime:寫事件的心跳檢測時長

  • allIdleTime: 既包含讀也包含寫的心跳檢測時長

發生心跳檢測後,需要對對應的事件處理, 需要我們自己定義一個handler然後實現userEventTriggered(context, event)方法用來接收事件