聊聊心跳機制及netty心跳實現
我們在使用netty的時候會使用一個引數,ChannelOption.SO_KEEPALIVE為true, 設定好了之後再Linux系統才會對keepalive生效,但是linux裡邊需要配置幾個引數,tcp_keepalive_time, tcp_keepalive_invl, tcp_keepalive_probes,如果不配置的時候都會是預設值。
tcp_keepalive_time 即給一個TCP連線傳送心跳包最後的時間間隔某一段時間後繼續傳送心跳包,允許空閒的時間,然後再次傳送心跳包,預設時間為7200秒,即2個小時發一次心跳包。
tcp_keepalive_invl,傳送存活探測時候未收到對方回執的時候,需要間隔一段時間繼續傳送。預設為75秒。
tcp_keepalive_probes,如果發了存活探測的時候沒有收到對方的回執,那麼需要繼續傳送探測的次數,此時預設值為9次,也就是未收到回執的時候需要傳送9次。
再理一次,間隔tcp_keepalive_time之後傳送心跳探測,如果未收到對方回執的時候,需要間隔tcp_keepalive_invl設定的時間繼續傳送,一共需要傳送tcp_keepalive_probes的次數。
這個是Linux系統的配置,如果要使用Linux的此功能需要設定SO_KEEPALIVE為true,同時設定其他幾個引數。系統預設的SO_KEEPALIVE為false。因為這些情況的差異,所以netty提供了自己實現心跳的機制。
netty有心跳的實現方法 IdleStateHandler,其中有讀空閒時間,寫空閒時間,讀寫空閒時間,只要有一個滿足條件會觸發userEventTriggered方法。
public IdleStateHandler( int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds)
定義個訊息內容吧,長度為Type的長度1 + 實際內容的長度5 = 6。Length為2個位元組,Type為1個型別。
+----------+----------+----------------+ | Length |Type(byte)| Actual Content | | 0x06 | 1 | "HELLO" | +----------+----------+----------------+
定義公共的inbound方法,用於進行channelRead, sendPing, sendPong, userEventTriggered 方法。
package com.hqs.heartbeat.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import java.util.concurrent.atomic.AtomicInteger; /** * @author huangqingshi * @Date 2019-05-11 */ public abstract class CustomeHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> { public static final byte PING = 1; public static final byte PONG = 2; public static final byte CUSTOM_MSG = 3; protected String name; private AtomicInteger heartbeatCount = new AtomicInteger(0); public CustomeHeartbeatHandler(String name) { this.name = name; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { if(byteBuf.getByte(2) == PING) { sendPong(channelHandlerContext); } else if(byteBuf.getByte(2) == PONG) { System.out.println("get pong msg from " + channelHandlerContext .channel().remoteAddress()); } else { handleData(channelHandlerContext, byteBuf); } } protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel read : " + msg); ByteBuf byteBuf = (ByteBuf) msg; System.out.println(byteBuf.getByte(2)); super.channelRead(ctx, msg); } protected void sendPong(ChannelHandlerContext channelHandlerContext) { ByteBuf buf = channelHandlerContext.alloc().buffer(3); buf.writeShort(3); buf.writeByte(PONG); channelHandlerContext.writeAndFlush(buf); heartbeatCount.incrementAndGet(); System.out.println("send pong message to " + channelHandlerContext.channel().remoteAddress()); } protected void sendPing(ChannelHandlerContext channelHandlerContext) { ByteBuf buf = channelHandlerContext.alloc().buffer(3); buf.writeShort(3); buf.writeByte(PING); channelHandlerContext.writeAndFlush(buf); heartbeatCount.incrementAndGet(); System.out.println("send ping message to " + channelHandlerContext.channel().remoteAddress()); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case ALL_IDLE: handlALLIdle(ctx); break; case READER_IDLE: handlReadIdle(ctx); break; case WRITER_IDLE: handlWriteIdle(ctx); break; default: break; } } } protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) { System.out.println("READ_IDLE---"); } protected void handlWriteIdle(ChannelHandlerContext channelHandlerContext) { System.out.println("WRITE_IDLE---"); } protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) { System.out.println("ALL_IDLE---"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel:" + ctx.channel().remoteAddress() + " is active"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel:" + ctx.channel().remoteAddress() + " is inactive"); } }
定義Server的方法,設定讀超時為10秒,採用固定長度方法進行內容分割:LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0),長度為1K 。一個主執行緒接收請求,四個執行緒處理請求。埠號設定為9999。
package com.hqs.heartbeat.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; /** * @author huangqingshi * @Date 2019-05-11 */ public class Server { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(4); try { ServerBootstrap bootstrapServer = new ServerBootstrap(); bootstrapServer.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline channelPipeline = ch.pipeline(); channelPipeline.addLast(new IdleStateHandler(10, 0, 0)); channelPipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0,2, -2, 0)); channelPipeline.addLast(new ServerHandler()); } }); Channel channel = bootstrapServer.bind(9999).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { throw new RuntimeException(e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
Server的handler的處理方法:
package com.hqs.heartbeat.server; import com.hqs.heartbeat.common.CustomeHeartbeatHandler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; /** * @author huangqingshi * @Date 2019-05-11 */ public class ServerHandler extends CustomeHeartbeatHandler { public ServerHandler() { super("server"); } @Override protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { byte[] data = new byte[byteBuf.readableBytes() - 3]; ByteBuf responseBuf = Unpooled.copiedBuffer(byteBuf); byteBuf.skipBytes(3); byteBuf.readBytes(data); String content = new String(data); System.out.println(name + " get content : " + content); channelHandlerContext.writeAndFlush(responseBuf); } @Override protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) { super.handlReadIdle(channelHandlerContext); System.out.println(" client " + channelHandlerContext.channel().remoteAddress() + " reader timeout close it --"); channelHandlerContext.close(); } }
定義Client類,所有超時時間為5秒,如果5秒沒有讀寫的話則傳送ping,如果失去連線之後inactive了就會重新連線,採用10秒出發一次。
package com.hqs.heartbeat.client; import com.hqs.heartbeat.common.CustomeHeartbeatHandler; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.Random; import java.util.concurrent.TimeUnit; /** * @author huangqingshi * @Date 2019-05-11 */ public class Client { private NioEventLoopGroup workGroup = new NioEventLoopGroup(4); private Channel channel; private Bootstrap bootstrap; public static void main(String[] args) throws InterruptedException { Client client = new Client(); client.start(); client.sendData(); } public void start() { try { bootstrap = new Bootstrap(); bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline channelPipeline = ch.pipeline() .addLast(new IdleStateHandler(0,0,5)) .addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0)) .addLast(new ClientHandler(Client.this)); } }); doConnect(); } catch (Exception e) { throw new RuntimeException(e); } } public void sendData() throws InterruptedException { Random random = new Random(System.currentTimeMillis()); for(int i = 0; i < 10000; i++) { if(channel != null && channel.isActive()) { String content = "client msg " + i; ByteBuf byteBuf = channel.alloc().buffer(3 + content.getBytes().length); byteBuf.writeShort(3 + content.getBytes().length); byteBuf.writeByte(CustomeHeartbeatHandler.CUSTOM_MSG); byteBuf.writeBytes(content.getBytes()); channel.writeAndFlush(byteBuf); } Thread.sleep(random.nextInt(20000)); } } public void doConnect() { if(channel != null && channel.isActive()) { return; } ChannelFuture future = bootstrap .connect("127.0.0.1", 9999); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isSuccess()) { channel = future.channel(); System.out.println("connect to server successfully"); } else { System.out.println("Failed to connect to server, try after 10s"); future.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } }, 10, TimeUnit.SECONDS); } } }); } }
定義clientHandler方法,讀取時跳過長度+型別 2+1 三個位元組,然後獲取訊息。連線斷開之後則進行重連。
package com.hqs.heartbeat.client; import com.hqs.heartbeat.common.CustomeHeartbeatHandler; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; /** * @author huangqingshi * @Date 2019-05-11 */ public class ClientHandler extends CustomeHeartbeatHandler { private Client client; public ClientHandler(Client client) { super("client"); this.client = client; } @Override protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { byte[] data = new byte[byteBuf.readableBytes() - 3]; byteBuf.skipBytes(3); byteBuf.readBytes(data); String content = new String(data); System.out.println(name + " get content:" + content); } @Override protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) { super.handlALLIdle(channelHandlerContext); sendPing(channelHandlerContext); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); client.doConnect(); } }
好了,總體的netty心跳實現機制就這麼多,希望能幫助到大家。
github地址:https://github.com/stonehqs/heartbeat