淺析 Netty心跳機制
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
例項化一個 IdleStateHandler 需要提供三個引數:
-
readerIdleTimeSeconds, 讀超時. 即當在指定的時間間隔內沒有從 Channel 讀取到資料時, 會觸發一個 READER_IDLE 的 IdleStateEvent 事件.
-
writerIdleTimeSeconds, 寫超時. 即當在指定的時間間隔內沒有資料寫入到 Channel 時, 會觸發一個 WRITER_IDLE 的 IdleStateEvent 事件.
-
allIdleTimeSeconds, 讀/寫超時. 即當在指定的時間間隔內沒有讀或寫操作時, 會觸發一個 ALL_IDLE 的 IdleStateEvent 事件.
為了展示具體的 IdleStateHandler 實現的心跳機制, 下面我們來構造一個具體的EchoServer 的例子, 這個例子的行為如下:
-
在這個例子中, 客戶端和伺服器通過 TCP 長連線進行通訊.
-
TCP 通訊的報文格式是:
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
-
客戶端每隔一個隨機的時間後, 向伺服器傳送訊息, 伺服器收到訊息後, 立即將收到的訊息原封不動地回覆給客戶端.
-
若客戶端在指定的時間間隔內沒有讀/寫操作, 則客戶端會自動向伺服器傳送一個 PING 心跳, 伺服器收到 PING 心跳訊息時, 需要回復一個 PONG 訊息.
通用部分
根據上面定義的行為, 我們接下來實現心跳的通用部分 CustomHeartbeatHandler
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final byte PING_MSG = 1;
public static final byte PONG_MSG = 2;
public static final byte CUSTOM_MSG = 3;
protected String name;
private int heartbeatCount = 0;
public CustomHeartbeatHandler(String name) {
this.name = name;
}
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(context);
} else if (byteBuf.getByte(4) == PONG_MSG){
System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
} else {
handleData(context, byteBuf);
}
}
protected void sendPingMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(5);
buf.writeInt(5);
buf.writeByte(PING_MSG);
context.writeAndFlush(buf);
heartbeatCount++;
System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
}
private void sendPongMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(5);
buf.writeInt(5);
buf.writeByte(PONG_MSG);
context.channel().writeAndFlush(buf);
heartbeatCount++;
System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
}
protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// IdleStateHandler 所產生的 IdleStateEvent 的處理邏輯.
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
}
protected void handleReaderIdle(ChannelHandlerContext ctx) {
System.err.println("---READER_IDLE---");
}
protected void handleWriterIdle(ChannelHandlerContext ctx) {
System.err.println("---WRITER_IDLE---");
}
protected void handleAllIdle(ChannelHandlerContext ctx) {
System.err.println("---ALL_IDLE---");
}
}
類 CustomHeartbeatHandler 負責心跳的傳送和接收, 我們接下來詳細地分析一下它的作用. 我們在前面提到, IdleStateHandler 是實現心跳的關鍵, 它會根據不同的 IO idle 型別來產生不同的 IdleStateEvent 事件, 而這個事件的捕獲, 其實就是在 userEventTriggered 方法中實現的.
我們來看看 CustomHeartbeatHandler.userEventTriggered 的具體實現:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
在 userEventTriggered 中, 根據 IdleStateEvent 的 state() 的不同, 而進行不同的處理. 例如如果是讀取資料 idle, 則 e.state() == READER_IDLE, 因此
就呼叫 handleReaderIdle 來處理它. CustomHeartbeatHandler 提供了三個 idle 處理方法: handleReaderIdle, handleWriterIdle, handleAllIdle,
這三個方法目前只有預設的實現, 它需要在子類中進行重寫, 現在我們暫時略過它們, 在具體的客戶端和伺服器的實現部分時再來看它們.
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(context);
} else if (byteBuf.getByte(4) == PONG_MSG){
System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
} else {
handleData(context, byteBuf);
}
}
在 CustomHeartbeatHandler.channelRead0 中, 我們首先根據報文協議:+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
來判斷當前的報文型別, 如果是 PING_MSG 則表示是伺服器收到客戶端的 PING 訊息, 此時伺服器需要回復一個 PONG 訊息, 其訊息型別是 PONG_MSG.
扔報文型別是 PONG_MSG, 則表示是客戶端收到伺服器傳送的 PONG 訊息, 此時列印一個 log 即可.
客戶端部分
客戶端初始化:public class Client {
public static void main(String[] args) {
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
Random random = new Random(System.currentTimeMillis());
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler());
}
});
Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
for (int i = 0; i < 10; i++) {
String content = "client msg " + i;
ByteBuf buf = ch.alloc().buffer();
buf.writeInt(5 + content.getBytes().length);
buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
buf.writeBytes(content.getBytes());
ch.writeAndFlush(buf);
Thread.sleep(random.nextInt(20000));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
workGroup.shutdownGracefully();
}
}
}
上面的程式碼是 Netty 的客戶端端的初始化程式碼, 使用過 Netty 的朋友對這個程式碼應該不會陌生. 別的部分我們就不再贅述, 我們來看看
ChannelInitializer.initChannel 部分即可:
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler());
}
});
我們給 pipeline 添加了三個 Handler, IdleStateHandler 這個 handler 是心跳機制的核心, 我們為客戶端端設定了讀寫 idle 超時, 時間間隔是5s, 即如果客
戶端在間隔 5s 後都沒有收到伺服器的訊息或向伺服器傳送訊息, 則產生 ALL_IDLE 事件.
接下來我們添加了 LengthFieldBasedFrameDecoder,
它是負責解析我們的 TCP 報文, 因為和本文的目的無關, 因此這裡不詳細展開.
最後一個 Handler 是 ClientHandler, 它繼承於 CustomHeartbeatHandler, 是我們處理業務邏輯部分.
客戶端
public class ClientHandler extends CustomHeartbeatHandler {
public ClientHandler() {
super("client");
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - 5];
byteBuf.skipBytes(5);
byteBuf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content: " + content);
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingMsg(ctx);
}
}
ClientHandler 繼承於 CustomHeartbeatHandler, 它重寫了兩個方法, 一個是 handleData, 在這裡面實現 僅僅列印收到的訊息.第二個重寫的方法是 handleAllIdle. 我們在前面提到, 客戶端負責傳送心跳的 PING 訊息, 當客戶端產生一個 ALL_IDLE 事件後, 會導致父類的 CustomHeartbeatHandler.userEventTriggered 呼叫, 而 userEventTriggered 中會根據 e.state() 來呼叫不同的方法, 因此最後呼叫的是 ClientHandler.handleAllIdle, 在這個方法中, 客戶端呼叫 sendPingMsg 向伺服器傳送一個 PING 訊息.