1. 程式人生 > >netty自定義訊息實現心跳檢測與重連

netty自定義訊息實現心跳檢測與重連

其實客戶端心跳傳送用到的是IdleStateHandler,詳細看程式碼你就會明白為什麼。

 //處理空閒狀態事件的處理器
pipeline.addLast(new IdleStateHandler(6,7,8, TimeUnit.SECONDS));

在IdleStateHandler中前三個引數分別是:

1.當在6秒內收到訊息,觸發IdleStateEvent事件。

2.當在7秒內沒有傳送訊息,觸發IdleStateEvent事件。

3.當在8內沒有接收到資料且在8秒內沒有傳送資料,觸發IdleStateEvent事件。

下面是實現的程式碼,程式碼量有點多....:

常量資料:

/**
 * Created by XiChuan on 2018-11-07.
 */
public class Constant {

    public static final int HEAD = 0x76;

    public static final String TYPE_PING = "PING";

    public static final String TYPE_MESSAGE = "MESSAGE";
}

自定義協議:

/**
 * Created by XiChuan on 2018-11-07.
 */

import java.util.Arrays;

/**
 * <pre>
 * 自己定義的協議
 *  資料包格式
 * +——----——+——-----——+——----——+
 * |協議開始標誌|  訊息型別長度  |  訊息型別 |    資料長度   |   資料    |
 * +——----——+——-----——+——----——+
 * 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
 * 2.要傳的協議型別長度(String的byte[]長度)
 * 3.要傳的協議型別(String)
 * 4.傳輸資料的長度contentLength,int型別
 * 5.要傳輸的資料
 * </pre>
 */
public class MessageProtocol {
    /**
     * 訊息的開頭的資訊標誌
     */
    private int headData = Constant.HEAD;

    /**
     * 訊息型別長度
     */
    private int typeLength;

    /**
     * 訊息型別
     */
    private String type;

    /**
     * 訊息的長度
     */
    private int contentLength;
    /**
     * 訊息的內容
     */
    private byte[] content;

    /**
     * 用於初始化,SmartCarProtocol
     *
     * @param contentLength
     *            協議裡面,訊息資料的長度
     * @param content
     *            協議裡面,訊息的資料
     */
    public MessageProtocol(int typeLength,String type,int contentLength, byte[] content) {
        this.typeLength = typeLength;
        this.type = type;
        this.contentLength = contentLength;
        this.content = content;
    }

    public int getHeadData() {
        return headData;
    }

    public void setHeadData(int headData) {
        this.headData = headData;
    }

    public int getTypeLength() {
        return typeLength;
    }

    public void setTypeLength(int typeLength) {
        this.typeLength = typeLength;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public int getContentLength() {
        return contentLength;
    }

    public void setContentLength(int contentLength) {
        this.contentLength = contentLength;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "MessageProtocol " +
                "[head_data=" + headData
                +", typeLength="+typeLength
                +", type="+type
                + ", contentLength=" + contentLength
                + ", content=" + Arrays.toString(content) + "]";
    }
}

定義解碼器:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * Created by XiChuan on 2018-11-06.
 */


public class MessageProtocolDecoder extends ByteToMessageDecoder {

    /**
     * <pre>
     * 協議開始的標準head_data,int型別,佔據4個位元組.
     * 表示資料的長度contentLength,int型別,佔據4個位元組.
     * </pre>
     */
    public final int BASE_LENGTH = 4 + 4;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
                          List<Object> out) throws Exception {
        // 可讀長度必須大於基本長度
        //System.out.println("buff的可讀長度是:"+buffer.readableBytes());
        if (buffer.readableBytes() >= BASE_LENGTH) {
            // 防止socket位元組流攻擊
            // 防止,客戶端傳來的資料過大
            // 因為,太大的資料,是不合理的
            if (buffer.readableBytes() > 2048) {
                buffer.skipBytes(buffer.readableBytes());
            }

            // 記錄包頭開始的index
            int beginReader;

            while (true) {
                // 獲取包頭開始的index
                beginReader = buffer.readerIndex();
                //System.out.println("記錄包頭開始的index:"+beginReader);
                // 標記包頭開始的index
                buffer.markReaderIndex();
                // 讀到了協議的開始標誌,結束while迴圈
                int head = buffer.readInt();
                //System.out.println("讀取的int:"+head);
                if (head == Constant.HEAD) {
                    break;
                }

                // 未讀到包頭,略過一個位元組
                // 每次略過,一個位元組,去讀取,包頭資訊的開始標記
                buffer.resetReaderIndex();
                buffer.readByte();

                // 當略過,一個位元組之後,
                // 資料包的長度,又變得不滿足
                // 此時,應該結束。等待後面的資料到達
                if (buffer.readableBytes() < BASE_LENGTH) {
                    return;
                }
            }

            //型別長度
            int typeLength = buffer.readInt();
            //訊息型別
            byte[] typeBytes = new byte[typeLength];
            buffer.readBytes(typeBytes);
            String type = new String(typeBytes);

            // 內容長度
            int dataLength = buffer.readInt();
            // 讀取data資料
            byte[] data = new byte[dataLength];
            buffer.readBytes(data);

            MessageProtocol protocol = new MessageProtocol(typeLength,type,dataLength, data);
            out.add(protocol);
        }
    }
}

定義編碼器:

import ch.qos.logback.core.encoder.ByteArrayUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * Created by XiChuan on 2018-11-07.
 */

public class MessageProtocolEncoder extends MessageToByteEncoder<MessageProtocol> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MessageProtocol msg, ByteBuf byteBuf) throws Exception {

        // 寫入訊息SmartCar的具體內容
        // 1.寫入訊息的開頭的資訊標誌(int型別)
        byteBuf.writeInt(msg.getHeadData());
        //2寫入訊息型別長度資訊(int)
        byteBuf.writeInt(msg.getTypeLength());
        //3寫入訊息型別(byte[])
        byteBuf.writeBytes(msg.getType().getBytes());
        // 4.寫入訊息的長度(int 型別)
        byteBuf.writeInt(msg.getContentLength());
        // 5.寫入訊息的內容(byte[]型別)
        byteBuf.writeBytes(msg.getContent());
    }
}

定義服務端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * Created by XiChuan on 2018-11-05.
 */
public class Server {

    private int port;

    public Server(int port){this.port = port;}

    public void run()throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup(); //用來接收進來的連線
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //用來處理已經被接收的連線

        try {
            ServerBootstrap bootstrap = new ServerBootstrap(); //啟動NIO服務的輔助啟動類
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class) //服務端
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {

                            //心跳機制 引數:1.讀空閒超時時間 2.寫空閒超時時間 3.所有型別的空閒超時時間(讀、寫) 4.時間單位
                            //在Handler需要實現userEventTriggered方法,在出現超時事件時會被觸發
                            socketChannel.pipeline().addLast("idleStateHandler", new IdleStateHandler(10, 0, 0,TimeUnit.SECONDS));
                            //設定解碼器
                            socketChannel.pipeline().addLast("decoder", new MessageProtocolDecoder());//new ByteArrayDecoder());//new FixedLengthFrameDecoder(4));
                            //設定自定義ChannelHandler
                            socketChannel.pipeline().addLast("channelHandler", new ServerHandler());
                            //設定編碼器
                            socketChannel.pipeline().addLast("encoder",new MessageProtocolEncoder());//new ByteArrayEncoder());

                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = bootstrap.bind(port).sync(); //繫結埠,開始接收進來的連線
            cf.channel().closeFuture().sync(); //等待伺服器socket關閉

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args)throws Exception{
        new Server(8081).run();
    }
}

定義服務段處理handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by XiChuan on 2018-11-05.
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger channelCount = new AtomicInteger(0); //通道數量

    /**
     * 讀資料
     * @param ctx
     * @param msg
     * @throws Exception
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("read channel=" + ctx.channel() + ", total channel=" + channelCount);
        try {

            MessageProtocol message = (MessageProtocol)msg;
            System.out.println("receive client message:"+message.toString()+",ip:"+ctx.channel().remoteAddress());

        } finally {
            // 拋棄收到的資料
            ReferenceCountUtil.release(msg);
        }
    }

    /**
     * 心跳檢測的超時時會觸發
     * @param ctx
     * @param evt
     * @throws Exception
     */
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {  //讀取心跳超時後,會將此channel連線斷開
                System.out.println("trigger channel =" + ctx.channel());
                ctx.close();  //如果超時,關閉這個通道
            }
        } else if (evt instanceof SslHandshakeCompletionEvent) {
            System.out.println("ssl handshake done");
            //super.userEventTriggered(ctx,evt);
        }
    }

    /**
     * 當通道活動時
     * @param ctx
     * @throws Exception
     */
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        channelCount.incrementAndGet();//當有新通道連線進來時,將通道數+1
        System.out.println("active channel=" + ctx.channel() + ", total channel=" + channelCount + ", id=" + ctx.channel().id().asShortText());

    }

    /**
     * 當通道不活動時
     * @param ctx
     * @throws Exception
     */
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       //當通道關閉時,將通道數-1
        ctx.close();
        channelCount.decrementAndGet();
        System.out.println("inactive channel,channel=" + ctx.channel() +", id=" + ctx.channel().id().asShortText());
    }

    /**
     * 異常獲取
     * @param ctx
     * @param cause
     * @throws Exception
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exception channel=" + ctx.channel() + " cause=" + cause); //如果不活躍關閉此通道
        ctx.close();
    }

}

後面主要就是客戶端程式碼了,主要就在這裡:

定義客戶端的ChannelHandler集合:

import io.netty.channel.ChannelHandler;

/**
 *
 * 客戶端的ChannelHandler集合,由子類實現,這樣做的好處:
 * 繼承這個介面的所有子類可以很方便地獲取ChannelPipeline中的Handlers
 * 獲取到handlers之後方便ChannelPipeline中的handler的初始化和在重連的時候也能很方便
 * 地獲取所有的handlers
 */
public interface ChannelHandlerHolder {

    ChannelHandler[] handlers();
}

定義客戶端重連檢測程式碼:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

/**
 *
 * 重連檢測狗,當發現當前的鏈路不穩定關閉之後,進行12次重連
 */
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{



    private final Bootstrap bootstrap;
    private final Timer timer;
    private final int port;

    private final String host;

    private volatile boolean reconnect = true;
    private int attempts;


    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }

    /**
     * channel鏈路每次active的時候,將其連線的次數重新☞ 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("當前鏈路已經激活了,重連嘗試次數重新置為:0");

        attempts = 0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連結關閉");
        if(reconnect){
            System.out.println("連結關閉,將進行重連,第"+(attempts+1)+"嘗試");
            if (attempts < 12) {
                attempts++;
                //重連的間隔時間會越來越長
                int timeout = 2 << attempts;
                timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
            }
        }
        ctx.fireChannelInactive();
    }

    public void run(Timeout timeout) throws Exception {

        ChannelFuture future;
        //bootstrap已經初始化好了,只需要將handler填入就可以了
        synchronized (bootstrap) {
            bootstrap.handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {
                    //handlers()方法在client住方法上已經初始化了
                    ch.pipeline().addLast(handlers());
                }
            });
            future = bootstrap.connect(host,port);
        }

        //future物件
        future.addListener(new ChannelFutureListener() {

            public void operationComplete(ChannelFuture f) throws Exception {
                boolean succeed = f.isSuccess();

                //如果重連失敗,則呼叫ChannelInactive方法,再次出發重連事件,一直嘗試12次,如果失敗則不再重連
                if (!succeed) {
                    System.out.println("重連失敗");
                    f.channel().pipeline().fireChannelInactive();
                }else{
                    System.out.println("重連成功");
                }
            }
        });

    }

}

定義客戶端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;

import java.util.concurrent.TimeUnit;

/**
 * Created by XiChuan on 2018-11-05.
 */
public class Client {

    protected final HashedWheelTimer timer = new HashedWheelTimer();

    private Bootstrap b;

    private String host;

    private int port;

    public Client(String host,int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();

        b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));

        final ConnectionWatchdog watchdog = new ConnectionWatchdog(b, timer, port,host, true) {

            public ChannelHandler[] handlers() {
                return new ChannelHandler[] {
                        this,
                        new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
                        //idleStateTrigger,
                        new MessageProtocolDecoder(),
                        new MessageProtocolEncoder(),
                        new ClientHandler()
                };
            }
        };

        ChannelFuture future;
        //進行連線
        try {
            synchronized (b) {
                b.handler(new ChannelInitializer<Channel>() {

                    //初始化channel
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog.handlers());
                    }
                });

                future = b.connect(host,port);
            }

            // 以下程式碼在synchronized同步塊外面是安全的
            future.sync();
            //future.channel().closeFuture().sync();
        } catch (Throwable t) {
            throw new Exception("connects to  fails", t);
        }finally {
            //group.shutdownGracefully();
            //System.out.println("client release resource...");
        }
    }

    public static void main(String[] args) throws Exception {
        new Client("127.0.0.1",8081).run();

    }

}

定義客戶端handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

/**
 * Created by XiChuan on 2018-11-05.
 */
public class ClientHandler  extends ChannelInboundHandlerAdapter {

    // 客戶端與服務端,連線成功的的處理
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("active channel:" + ctx.channel()+",time:"+new Date().toLocaleString());
        ctx.fireChannelActive();

        /*// 傳送自定義Message協議的訊息
        // 要傳送的資訊
        String type = Constant.TYPE_PING;
        int typeLength = type.getBytes().length;

        String str = "I am client ...";
        // 獲得要傳送資訊的位元組陣列
        byte[] content = str.getBytes();
        // 要傳送資訊的長度
        int contentLength = content.length;
        MessageProtocol protocol = new MessageProtocol(typeLength,type,contentLength, content);
        System.out.println("send message:"+protocol.toString());

        Channel channel = ctx.channel();
        channel.writeAndFlush(protocol);*/
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("inactive channel:" + ctx.channel()+",time:"+new Date().toLocaleString());

    }

    // 只是讀資料,沒有寫資料的話
    // 需要自己手動的釋放的訊息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("read channel:" + ctx.channel());
        try {
            MessageProtocol messageProtocol = (MessageProtocol) msg;
            System.out.println("receive server message:" + messageProtocol.toString());
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // write heartbeat to server
                String message = "heart beat ...";
                MessageProtocol messageProtocol = new MessageProtocol(
                        Constant.TYPE_PING.getBytes().length,
                        Constant.TYPE_PING,
                        message.getBytes().length,
                        message.getBytes());
                ctx.writeAndFlush(messageProtocol);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

}

上述是全部的程式碼,先執行server然後執行client,效果如下:

服務端:

客戶端:

當關閉服務端,客戶端日誌: