1. 程式人生 > 實用技巧 >netty實戰(二)

netty實戰(二)

  和上一篇相比,此專案的場景有所不同:需要採集機房的電錶、溫溼度、水浸和煙感的資料,首先通過通訊管理機先將資料直採,然後通過伺服器採集程式採集通訊管理機上儲存的資料並解析入庫。

  先說說這2個有什麼不同,專案1中戶外裝置主動連線伺服器,並定時傳送報文,而此專案需要伺服器去主動連線裝置傳送報文采集資料,僅僅需要開發netty的client端。

  專案框架:springboot+netty+mybatis+lombok+logback

  開發環境:idea2018+jdk1.8+mysql5.6.35+maven3.5.3

  專案搭建:

  1.快速搭建springboot專案,配置pom.xml檔案依賴包

  

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2
.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.rtst</groupId> <artifactId>dhjclistener</artifactId> <version>0.0.1-SNAPSHOT</version> <name>dhjclistener</name> <packaging>jar</packaging> <description>Demo project for
Spring Boot</description> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.2</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- netty依賴 springboot2.0自動匯入版本--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <!--郵件依賴包--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--編寫更少量的程式碼:使用apache commons工具類庫: https://www.cnblogs.com/ITtangtang/p/3966955.html--> <!--apache.commons.lang3--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!--apache.codec:編碼方法的工具類包 https://blog.csdn.net/u012881904/article/details/52767853--> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

2.自定義netty的client端BootNettyClient類

package com.rtst.dhjclistener.nettyclient;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;


import java.util.List;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class BootNettyClient {
    @Autowired
    BootNioChannelInitializer bootNioChannelInitializer;
    @Value("${netty.port}")
    private Integer  port;
    @Value("#{'${netty.host}'.split(',')}")
    private List<String> hosts;

    private List<Channel> channels=null;

    private Bootstrap bootstrap;
        //定義執行緒組,處理讀寫和連線事件,沒有了accept事件
         private EventLoopGroup workGroup = new NioEventLoopGroup();
        public void start() throws Exception {
                bootstrap = new Bootstrap();
                bootstrap.group(workGroup);
                //繫結客戶端通道
                bootstrap.channel(NioSocketChannel.class);
                //給NioSocketChannel初始化handler,處理讀寫事件
                bootstrap.handler(bootNioChannelInitializer);
                System.out.println("開始啟動----");
                for(int i =0;i<hosts.size();i++){
                    if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(hosts.get(i)))){
                        doConnect(hosts.get(i),port);
                    }else{
                        continue;
                    }
                }
        }
        //發起連線
        protected void doConnect(String host,int port) {
                ChannelFuture future = bootstrap.connect(host, port);
                future.addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture futureListener) throws Exception {
                        if (futureListener.isSuccess()) {
                            channels.add(futureListener.channel());
                            log.info(host+"Connect to server successfully!");
                        } else {
                            log.info(host+"Failed to connect to server, try connect after 10s");
                            futureListener.channel().eventLoop().schedule(new Runnable() {
                                @Override
                                public void run() {
                                    log.info(host+"重新連線----");
                                    doConnect(host,port);//遞迴doConnect方法,進行斷線重連
                                }
                            }, 10, TimeUnit.SECONDS);
                        }
                    }
                });
            }
}

3.自定義初始化類BootNioChannelInitializer類

package com.rtst.dhjclistener.nettyclient;

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class BootNioChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
    @Autowired
    BootNettyClientHandler bootNettyClientHandler;
    @Autowired
    MyDecoder myDecoder;
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast("idleStateHandler",new IdleStateHandler(15,0,15, TimeUnit.SECONDS));//注意new IdleStateHandler的作用
        ch.pipeline().addLast("decoder", myDecoder);
        //找到他的管道 增加他的handler
        ch.pipeline().addLast(bootNettyClientHandler);
        System.out.println("初始化通道");
    }
}

4.自定義業務處理類BootNettyClientHandler類

package com.rtst.dhjclistener.nettyclient;

import com.rtst.dhjclistener.entity.Signal;
import com.rtst.dhjclistener.repository.DsignalMapper;
import com.rtst.dhjclistener.repository.SignalMapper;

import com.rtst.dhjclistener.service.SendEmailContentService;
import com.rtst.dhjclistener.util.StringUitls;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;

@Component
@Slf4j
@ChannelHandler.Sharable
public class BootNettyClientHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    DsignalMapper dsignalMapper;
    @Autowired
    SignalMapper signalMapper;
    @Autowired
    BootNettyClient bootNettyClient;
    @Autowired
    SendEmailContentService sendEmailContentService;//傳送告警郵件service類
    @Value("${netty.schoolId}")
    private int schoolId;
    @Value("#{'${netty.host}'.split(',')}")
    private List<String> host;
    @Value("${netty.port}")
    private int  port;
    //  將當前客戶端連線 存入map   實現控制裝置下發 引數
    public  static Map<String, Channel> ctxMap = new LinkedHashMap<String, Channel>();
    public int triggeredNum=0;//計數器
    public int EmailNum=0;//郵件方法計數器

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();//獲取現連線的IP地址
        if(ctxMap.get(clientIp)!=null){//如果不為空就不存
        }else{//否則就將當前的裝置ip+埠存進map  當做下發裝置的標識的key
            ctxMap.put(clientIp, ctx.channel());
        }
        log.info(clientIp+"------連線成功-------");
    }
    @Override
    public  void channelRead(ChannelHandlerContext ctx, Object msg){
        SocketChannel channel = (SocketChannel) ctx.channel();
        ByteBuf buff = Unpooled.buffer();//netty需要用ByteBuf傳輸
        //將字串轉成每兩個字元加空格形式的字串
        String regex = "(.{2})";
        String input = msg.toString().replaceAll(regex, "$1 ");
        log.info(channel.remoteAddress().getHostString() + ":  " + input);
        System.out.println("服務端接受資訊為: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到訊息:" + input);
        byte[] bytes = StringUitls.toByteArray(msg.toString());
        Map<String,Object> params = new LinkedHashMap<>();
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();//獲取現連線的IP地址
     //業務處理邏輯,解析報文併入庫,如果有告警,傳送郵件提示使用者機房存在告警
      ......
} /** * 連線斷開時進入該方法 * @param ctx */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端斷開連線---channelInactive"); log.info("客戶端斷開連線---channelInactive"); super.channelInactive(ctx); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//獲取現連線的IP地址 if(ctxMap.get(clientIp)!=null){//如果不為空就刪除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); bootNettyClient.doConnect(clientIp,port);//斷線重連 } /** * 出現異常時進入該方法 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//獲取現連線的IP地址 if(ctxMap.get(clientIp)!=null){//如果不為空就刪除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); } //處理超時讀寫空閒事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("觸發讀寫空閒操作-----"); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//獲取現連線的IP地址 if (evt instanceof IdleStateEvent){ IdleStateEvent idleStateEvent = (IdleStateEvent) evt ; log.info(clientIp+"觸發"+idleStateEvent.state()+"事件"); //獲取IdleStateEvent事件,根據狀態是否為讀狀態空閒 if (idleStateEvent.state() == IdleState.READER_IDLE){ log.info("已經 好長時間沒有收到資訊!"); System.out.println("嘗試再次傳送命令"); //向下位機發送訊息 ByteBuf buf =Unpooled.buffer(); String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";//可以設定成讀取更大地址的資料,比如讀0-500的地址位:00 00 00 00 00 06 c8 03 00 00 01 F4 05 c2 byte[] msg = StringUitls.hexStrToBinaryStr(order); buf.writeBytes(msg); ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); triggeredNum++; if(triggeredNum>=3){ ctx.close(); triggeredNum=0; } } } super.userEventTriggered(ctx, evt); } }

5.application類實現CommandLineRunner,來啟動nettyClient服務

package com.rtst.dhjclistener;

import com.rtst.dhjclistener.nettyclient.BootNettyClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@MapperScan("com.rtst.dhjclistener.repository")
@EnableScheduling
public class DhjclistenerApplication implements CommandLineRunner {
    @Autowired
    BootNettyClient bootNettyClient;
    public static void main(String[] args) {
        SpringApplication.run(DhjclistenerApplication.class, args);
    }
    @Override
    public void run(String... args) throws Exception {
        /**
         * 啟動netty服務端服務
         */
        bootNettyClient.start();
    }
}

6.定時任務傳送報文到下位機,請求下位機採集儲存的資料

package com.rtst.dhjclistener.ordertask;


import com.rtst.dhjclistener.nettyclient.BootNettyClientHandler;
import com.rtst.dhjclistener.util.StringUitls;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.List;

@Component
@Configuration
public class MyTask {
@Value("#{'${netty.host}'.split(',')}")
private List<String> host;
    @Scheduled(cron = "*/20 * * * * ?")//每20秒執行一次傳送命令,此處根據自己實際需求設定時間
    public void order() {
        ByteBuf buf = Unpooled.buffer();
        String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";
        byte[] msg = StringUitls.hexStrToBinaryStr(order);
        buf.writeBytes(msg);
        buf.retain(1);//在同時採集2個通訊管理機時會報異常,此行程式碼可以解決,如果只是採集一個通訊管理機時是不會存在該異常的
        for(int i=0;i<host.size();i++){
            if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(host.get(i)))){
                    System.out.println(host.get(i)+"channel物件為空");
                    continue;
                }else{
                    BootNettyClientHandler.ctxMap.get(host.get(i)).writeAndFlush(buf);
                }
        }
        System.out.println("傳送命令成功");
        }
}

  注意點:1.斷線重連

      2.多個通訊管理機定時任務傳送報文時,存在異常。解決方法:buf.retain()的作用