1. 程式人生 > 其它 >Netty實現管理端主動向客戶端推送資料

Netty實現管理端主動向客戶端推送資料

技術標籤:nettyjava網路

1.需求分析:

跑一個定時任務每十秒從資料庫中查詢一次資料,存到redis中,然後通過netty將資料推送到每一個連線的客戶端

2.具體實現

2.1定義一個NettyServer用來配置netty中channel的一些配置資訊,及對資料任務的處理
@Component
public class NettyServer {

    private static Logger logger = LoggerFactory.getLogger(NettyServer.class);


    @Autowired
    private ServerHandler serverHandler;
public static ServerSocketChannel serverSocketChannel; public void start(int port) throws Exception { // 連線處理group EventLoopGroup boss = new NioEventLoopGroup(); // 事件處理group EventLoopGroup worker = new NioEventLoopGroup(); //1.建立ServerBootStrap例項 ServerBootstrap bootstrap =
new ServerBootstrap(); // 繫結處理group //2.設定並繫結Reactor執行緒池:EventLoopGroup,EventLoop就是處理所有註冊到本執行緒的Selector上面的Channel bootstrap.group(boss, worker) //3.設定並繫結服務端的channel .channel(NioServerSocketChannel.class) // 保持連線數 .option
(ChannelOption.SO_BACKLOG, 1024) // 有資料立即傳送 .option(ChannelOption.TCP_NODELAY, true) // 保持連線 .childOption(ChannelOption.SO_KEEPALIVE, true) // 處理新連線 //設定了客戶端連線socket屬性。 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // 增加任務處理 ChannelPipeline pipeline = sc.pipeline(); pipeline.addLast(new HttpServerCodec()); //支援寫大資料流 pipeline.addLast(new ChunkedWriteHandler()); //http聚合器 pipeline.addLast(new HttpObjectAggregator(1024*62)); //websocket支援,設定路由 pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); pipeline.addLast( // 自定義的處理器 serverHandler); } }); // 繫結埠,同步等待成功 ChannelFuture future; try { logger.info("netty伺服器在[{}]埠啟動監聽",port); //真正讓netty跑起來的重點 future = bootstrap.bind(port).sync(); if (future.isSuccess()) { serverSocketChannel = (ServerSocketChannel) future.channel(); logger.info("netty服務開啟成功"); } else { logger.info("netty服務開啟失敗"); } // 等待服務監聽埠關閉,就是由於這裡會將執行緒阻塞,導致無法傳送資訊,所以我這裡開了執行緒 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 優雅地退出,釋放執行緒池資源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
2.2定義一個ServerHandler,繼承SimpleChannelInboundHandler,用來做與使用者端連線,及連線後對channel的資料處理,

注:@ChannelHandler.Sharable 因為一個ChannelHandler可以從屬於多個ChannelPipeline,所以它也可以繫結到多個ChannelHandlerContext例項。用於這種用法的ChannelHandler必須要使用@Sharable註解標註;否則,試圖將它新增到多個ChannelPipeline時將會觸發異常。為了安全地被用於多個併發的Channel(即連線),這樣的ChannelHandler必須是執行緒安全的。

@Component
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


    private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);

    /**
     * 在與客戶端的連線已經建立之後將被呼叫
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //獲取每個使用者端連線的ip
        InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        logger.info("netty客戶端與服務端連線開始...");
        ChannelMapServer.addChannel(clientIp,ctx.channel());
        ctx.writeAndFlush("連線成功");

    }

//    /**
//     * 當從客戶端接收到一個訊息時被呼叫
//     * msg 就是硬體傳送過來的資料資訊
//     */
//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
//    }



    /**
     * 客戶端與服務端斷開連線時呼叫
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.info("netty客戶端與服務端連線關閉...");
        InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        ChannelMapServer.removeChannelByName(clientIp);
    }

//    /**
//     * 服務端接收客戶端傳送過來的資料結束之後呼叫
//     */
//    @Override
//    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//
//    }

    /**
     * 在處理過程中引發異常時被呼叫
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        System.out.println("異常資訊:rn " + cause.getMessage());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
        System.out.println("伺服器端收到訊息" + textWebSocketFrame.text());
        channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame("receive client msg:"+ textWebSocketFrame.text()));
    }
}
2.3ChannelMapServer,定義一個map結合來儲存每一個連線進來的客戶端,然後向每一個連線進來的客戶端定時推送訊息
public class ChannelMapServer {


    public static ConcurrentHashMap<String, Channel> channelHashMap=null;

    /**
     *  獲取ConcurrentHashMap
     */
    public static ConcurrentHashMap<String, Channel> getChannelHashMap() {
        return channelHashMap;
    }

    /**
     *  獲取指定name的channel
     */
    public static Channel getChannelByName(String name){
        if(channelHashMap==null||channelHashMap.isEmpty()){
            return null;
        }
        return channelHashMap.get(name);
    }

    /**
     *  將通道中的訊息推送到每一個客戶端
     */
    public static boolean pushNewsToAllClient(String obj){
        if(channelHashMap==null||channelHashMap.isEmpty()){
            return false;
        }
        for(String name: channelHashMap.keySet()) {
            Channel channel = channelHashMap.get(name);
            channel.writeAndFlush(new TextWebSocketFrame(obj));
        }
        return true;
    }

    /**
     *  將channel和對應的name新增到ConcurrentHashMap
     */
    public static void addChannel(String name,Channel channel){
        if(channelHashMap==null){
            channelHashMap=new ConcurrentHashMap<>(128);
        }
        channelHashMap.put(name,channel);
      
    }

    /**
     *  移除掉name對應的channel
     */
    public static String removeChannelByName(String name){
        if(channelHashMap.containsKey(name)){
            channelHashMap.remove(name);
            return Constants.CHANNEL_NAME_SUCCESS;
        }
        return Constants.CHANNEL_NAME_FAILURE;
    }
}
2.4在Application中配置專案啟動時即執行
@SpringBootApplication
@EnableTransactionManagement
@EnableScheduling
@MapperScan("com.XXX.YYYYYY.mapper")
@ComponentScan(basePackages = {"com.XXX"})
public class Application implements CommandLineRunner {
    @Autowired
    private NettyServer nettyServer;

    public static void main(String[] args) {
        ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        nettyServer.start(8082);
    }

}

3.測試,使用postjson進行線上測試

在這裡插入圖片描述
可以看到netty啟動成功
這是執行定時任務,就能在postjson中打印出定時任務執行的資料