Netty 之 Netty生產級的心跳和重連機制
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow
也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!
sigh,寫這篇部落格的時候老臉還是紅了一下,心裡還是有些唏噓的,應該算是剽竊吧,每個人的程式碼功力的確是有差距的,好在文章的標題是“一起學”,而不是開濤大神的“跟我學”系列的文章,我們還是多花點時間學習吧,感嘆無用~
最近工作比較忙,但閒暇之餘還是看了阿里的馮家春(fengjiachun)的github上的開原始碼Jupiter,寫的RPC框架讓我感嘆人外有人,廢話不多說,下面的程式碼全部擷取自Jupiter,寫了一個比較完整的例子,供大家一起學習分享,再次對@Luca抱拳,Jupiter的Github地址:
https://github.com/fengjiachun/Jupiter
今天研究的是,心跳和重連,雖然這次是大神寫的程式碼,但是萬變不離其宗,我們先回顧一下Netty應用心跳和重連的整個過程:
1)客戶端連線服務端
2)在客戶端的的ChannelPipeline中加入一個比較特殊的IdleStateHandler,設定一下客戶端的寫空閒時間,例如5s
3)當客戶端的所有ChannelHandler中4s內沒有write事件,則會觸發userEventTriggered方法(上文介紹過)
4)我們在客戶端的userEventTriggered中對應的觸發事件下發送一個心跳包給服務端,檢測服務端是否還存活,防止服務端已經宕機,客戶端還不知道
5)同樣,服務端要對心跳包做出響應,其實給客戶端最好的回覆就是“不回覆”,這樣可以服務端的壓力,假如有10w個空閒Idle的連線,那麼服務端光傳送心跳回復,則也是費事的事情,那麼怎麼才能告訴客戶端它還活著呢,其實很簡單,因為5s服務端都會收到來自客戶端的心跳資訊,那麼如果10秒內收不到,服務端可以認為客戶端掛了,可以close鏈路
6)加入服務端因為什麼因素導致宕機的話,就會關閉所有的鏈路連結,所以作為客戶端要做的事情就是短線重連
以上描述的就是整個心跳和重連的整個過程,雖然很簡單,上一篇blog也寫了一個Demo,簡單地做了一下上述功能
要寫工業級的Netty心跳重連的程式碼,需要解決一下幾個問題:
1)ChannelPipeline中的ChannelHandlers的維護,首次連線和重連都需要對ChannelHandlers進行管理
2)重連物件的管理,也就是bootstrap物件的管理
3)重連機制編寫
完整的程式碼:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle
下面我們就看大神是如何解決這些問題的,首先先定義一個介面ChannelHandlerHolder,用來保管ChannelPipeline中的Handlers的
[java] view plain copy- package com.lyncc.netty.idle;
- import io.netty.channel.ChannelHandler;
- /**
- *
- * 客戶端的ChannelHandler集合,由子類實現,這樣做的好處:
- * 繼承這個介面的所有子類可以很方便地獲取ChannelPipeline中的Handlers
- * 獲取到handlers之後方便ChannelPipeline中的handler的初始化和在重連的時候也能很方便
- * 地獲取所有的handlers
- */
- public interface ChannelHandlerHolder {
- ChannelHandler[] handlers();
- }
HeartBeatServer.java
[java] view plain copy- package com.lyncc.netty.idle;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import io.netty.handler.timeout.IdleStateHandler;
- import java.net.InetSocketAddress;
- import java.util.concurrent.TimeUnit;
- public class HeartBeatServer {
- private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
- private int port;
- public HeartBeatServer(int port) {
- this.port = port;
- }
- public void start() {
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
- .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
- ch.pipeline().addLast(idleStateTrigger);
- ch.pipeline().addLast("decoder", new StringDecoder());
- ch.pipeline().addLast("encoder", new StringEncoder());
- ch.pipeline().addLast(new HeartBeatServerHandler());
- };
- }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
- // 繫結埠,開始接收進來的連線
- ChannelFuture future = sbs.bind(port).sync();
- System.out.println("Server start listen at " + port);
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- public static void main(String[] args) throws Exception {
- int port;
- if (args.length > 0) {
- port = Integer.parseInt(args[0]);
- } else {
- port = 8080;
- }
- new HeartBeatServer(port).start();
- }
- }
- package com.lyncc.netty.idle;
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- @ChannelHandler.Sharable
- public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleState state = ((IdleStateEvent) evt).state();
- if (state == IdleState.READER_IDLE) {
- throw new Exception("idle exception");