1. 程式人生 > 其它 >Springboot+Netty+Websocket實現訊息推送例項

Springboot+Netty+Websocket實現訊息推送例項

技術標籤:springboot netty websocketwebsocketnettysocketspring bootjava

Springboot+Netty+Websocket實現訊息推送


文章目錄


前言

WebSocket 使得客戶端和伺服器之間的資料交換變得更加簡單,允許服務端主動向客戶端推送資料。在 WebSocket API 中,瀏覽器和伺服器只需要完成一次握手,兩者之間就直接可以建立永續性的連線,並進行雙向資料傳輸。
Netty框架的優勢

 1. API使用簡單,開發門檻低;
 2. 功能強大,預置了多種編解碼功能,支援多種主流協議;
 3. 定製能力強,可以通過ChannelHandler對通訊框架進行靈活地擴充套件;
 4. 效能高,通過與其他業界主流的NIO框架對比,Netty的綜合性能最優;
 5. 成熟、穩定,Netty修復了已經發現的所有JDK NIO BUG,業務開發人員不需要再為NIO的BUG而煩惱

提示:以下是本篇文章正文內容,下面案例可供參考

一、引入netty依賴

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.48.Final</version>
</dependency>

二、使用步驟

1.引入基礎配置類

package com.test.netty;

public enum Cmd {
    START("000", "連線成功"),
    WMESSAGE("001", "訊息提醒"),
    ;
    private String cmd;
    private String desc;

    Cmd(String cmd, String desc) {
        this.cmd = cmd;
        this.desc = desc;
    }

    public String getCmd
() { return cmd; } public String getDesc() { return desc; } }

2.netty服務啟動監聽器

package com.test.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author test
 * <p>
 * 服務啟動監聽器
 **/
@Slf4j
@Component
public class NettyServer {

    @Value("${server.netty.port}")
    private int port;

    @Autowired
    private ServerChannelInitializer serverChannelInitializer;

    @Bean
    ApplicationRunner nettyRunner() {
        return args -> {
            //new 一個主執行緒組
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            //new 一個工作執行緒組
            EventLoopGroup workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(serverChannelInitializer)
                    //設定佇列大小
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 兩小時內沒有資料的通訊時,TCP會自動傳送一個活動探測資料報文
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            //繫結埠,開始接收進來的連線
            try {
                ChannelFuture future = bootstrap.bind(port).sync();
                log.info("伺服器啟動開始監聽埠: {}", port);
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //關閉主執行緒組
                bossGroup.shutdownGracefully();
                //關閉工作執行緒組
                workGroup.shutdownGracefully();
            }
        };
    }
}

3.netty服務端處理器

package com.test.netty;

import com.test.common.util.JsonUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.net.URLDecoder;
import java.util.*;

/**
 * @author test
 * <p>
 * netty服務端處理器
 **/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Autowired
    private ServerChannelCache cache;
    private static final String dataKey = "test=";

    @Data
    public static class ChannelCache {
    }


    /**
     * 客戶端連線會觸發
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        log.info("通道連線已開啟,ID->{}......", channel.id().asLongText());
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            Channel channel = ctx.channel();
            WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
            String requestUri = handshakeComplete.requestUri();
            requestUri = URLDecoder.decode(requestUri, "UTF-8");
            log.info("HANDSHAKE_COMPLETE,ID->{},URI->{}", channel.id().asLongText(), requestUri);
            String socketKey = requestUri.substring(requestUri.lastIndexOf(dataKey) + dataKey.length());
            if (socketKey.length() > 0) {
                cache.add(socketKey, channel);
                this.send(channel, Cmd.DOWN_START, null);
            } else {
                channel.disconnect();
                ctx.close();
            }
        }
        super.userEventTriggered(ctx, evt);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        log.info("通道連線已斷開,ID->{},使用者ID->{}......", channel.id().asLongText(), cache.getCacheId(channel));
        cache.remove(channel);
    }

    /**
     * 發生異常觸發
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        log.error("連接出現異常,ID->{},使用者ID->{},異常->{}......", channel.id().asLongText(), cache.getCacheId(channel), cause.getMessage(), cause);
        cache.remove(channel);
        ctx.close();
    }

    /**
     * 客戶端發訊息會觸發
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        try {
            // log.info("接收到客戶端傳送的訊息:{}", msg.text());
            ctx.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toString(Collections.singletonMap("cmd", "100"))));
        } catch (Exception e) {
            log.error("訊息處理異常:{}", e.getMessage(), e);
        }
    }

    public void send(Cmd cmd, String id, Object obj) {
        HashMap<String, Channel> channels = cache.get(id);
        if (channels == null) {
            return;
        }
        Map<String, Object> data = new LinkedHashMap<>();
        data.put("cmd", cmd.getCmd());
        data.put("data", obj);
        String msg = JsonUtil.toString(data);
        log.info("伺服器下發訊息: {}", msg);
        channels.values().forEach(channel -> {
            channel.writeAndFlush(new TextWebSocketFrame(msg));
        });
    }

    public void send(Channel channel, Cmd cmd, Object obj) {
        Map<String, Object> data = new LinkedHashMap<>();
        data.put("cmd", cmd.getCmd());
        data.put("data", obj);
        String msg = JsonUtil.toString(data);
        log.info("伺服器下發訊息: {}", msg);
        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }

}

4.netty服務端快取類

package com.test.netty;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class ServerChannelCache {
    private static final ConcurrentHashMap<String, HashMap<String, Channel>> CACHE_MAP = new ConcurrentHashMap<>();
    private static final AttributeKey<String> CHANNEL_ATTR_KEY = AttributeKey.valueOf("test");

    public String getCacheId(Channel channel) {
        return channel.attr(CHANNEL_ATTR_KEY).get();
    }

    public void add(String cacheId, Channel channel) {
        channel.attr(CHANNEL_ATTR_KEY).set(cacheId);
        HashMap<String, Channel> hashMap = CACHE_MAP.get(cacheId);
        if (hashMap == null) {
            hashMap = new HashMap<>();
        }
        hashMap.put(channel.id().asShortText(), channel);
        CACHE_MAP.put(cacheId, hashMap);
    }

    public HashMap<String, Channel> get(String cacheId) {
        if (cacheId == null) {
            return null;
        }
        return CACHE_MAP.get(cacheId);
    }

    public void remove(Channel channel) {
        String cacheId = getCacheId(channel);
        if (cacheId == null) {
            return;
        }
        HashMap<String, Channel> hashMap = CACHE_MAP.get(cacheId);
        if (hashMap == null) {
            hashMap = new HashMap<>();
        }
        hashMap.remove(channel.id().asShortText());
        CACHE_MAP.put(cacheId, hashMap);
    }
}

5.netty服務初始化器

package com.test.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author Gjing
 * <p>
 * netty服務初始化器
 **/
@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    private NettyServerHandler nettyServerHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler("/test.io", true, 5000));
        pipeline.addLast(nettyServerHandler);
    }
}

6.html測試

<!DOCTYPE HTML>
<html>
   <head>
   <meta charset="utf-8">
   <title>test</title>
    
      <script type="text/javascript">
         function WebSocketTest()
         {
            if ("WebSocket" in window)
            {
               alert("您的瀏覽器支援 WebSocket!");
               
               // 開啟一個 web socket
               var ws = new WebSocket("ws://localhost:port/test.io");
                
               ws.onopen = function()
               {
                  // Web Socket 已連線上,使用 send() 方法傳送資料
                  ws.send("傳送資料");
                  alert("資料傳送中...");
               };
                
               ws.onmessage = function (evt) 
               { 
                  var received_msg = evt.data;
                  alert("資料已接收...");
               };
                
               ws.onclose = function()
               { 
                  // 關閉 websocket
                  alert("連線已關閉..."); 
               };
            }
            
            else
            {
               // 瀏覽器不支援 WebSocket
               alert("您的瀏覽器不支援 WebSocket!");
            }
         }
      </script>
        
   </head>
   <body>
   
      <div id="sse">
         <a href="javascript:WebSocketTest()">執行 WebSocket</a>
      </div>
      
   </body>
</html>

7.vue測試

 mounted() {
            this.initWebsocket();
        },
        methods: {
            initWebsocket() {
                let websocket = new WebSocket('ws://localhost:port/test.io?test=123456');
                websocket.onmessage = (event) => {
                    let msg = JSON.parse(event.data);
                    switch (msg.cmd) {
                        case "000":
                            this.$message({
                                type: 'success',
                                message: "建立實時連線成功!",
                                duration: 1000
                            })
                            setInterval(()=>{websocket.send("heartbeat")},60*1000);
                            break;
                        case "001":
                            this.$message.warning("收到一條新的資訊,請及時檢視!")
                            break;
                    }
                }
                websocket.onclose = () => {
                    setTimeout(()=>{
                        this.initWebsocket();
                    },30*1000);
                }
                websocket.onerror = () => {
                    setTimeout(()=>{
                        this.initWebsocket();
                    },30*1000);
                }
            },
        },
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20210107160420568.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3d1X3Fpbmdfc29uZw==,size_16,color_FFFFFF,t_70#pic_center)


8.伺服器下發訊息

@Autowired
	private NettyServerHandler nettyServerHandler;
nettyServerHandler.send(CmdWeb.WMESSAGE, id, message);

總結

按照上面步驟,一步一步的來,是可以實現訊息推送的功能的。不要著急,當你除錯成功之後,發現並沒有自己想想中的那麼難,加油!親測可以使用,如果感覺本文件對您有幫助,可以請喝個下午茶!