1. 程式人生 > 實用技巧 >SpringBoot2.0整合WebSocket,實現後臺向前端推送資訊

SpringBoot2.0整合WebSocket,實現後臺向前端推送資訊

連結:https://blog.csdn.net/moshowgame/article/details/80275084
SpringBoot+WebSocket整合

什麼是WebSocket?


WebSocket協議是基於TCP的一種新的網路協議。它實現了瀏覽器與伺服器全雙工(full-duplex)通訊——允許伺服器主動傳送資訊給客戶端。

為什麼需要 WebSocket?

初次接觸 WebSocket 的人,都會問同樣的問題:我們已經有了 HTTP 協議,為什麼還需要另一個協議?它能帶來什麼好處?

  • 答案很簡單,因為 HTTP 協議有一個缺陷:通訊只能由客戶端發起,HTTP 協議做不到伺服器主動向客戶端推送資訊。

    舉例來說,我們想要查詢當前的排隊情況,只能是頁面輪詢向伺服器發出請求,伺服器返回查詢結果。輪詢的效率低,非常浪費資源(因為必須不停連線,或者 HTTP 連線始終開啟)。因此WebSocket 就是這樣發明的。
  • 前言

    2020-10-20 教程補充:

    • 補充關於@Component@ServerEndpoint關於是否單例模式等的解答,感謝大家熱心提問和研究。
    • Vue版本的websocket連線方法

    2020-01-05 教程補充:

    感謝大家的支援和留言,14W訪問量是滿滿的動力!接下來還會有websocket+redis叢集優化篇針對多ws伺服器做簡單優化處理,敬請期待!

    話不多說,馬上進入乾貨時刻。

    maven依賴

    SpringBoot2.0對WebSocket的支援簡直太棒了,直接就有包可以引入

    <dependency>  
               <groupId>org.springframework.boot</groupId>  
               <artifactId>spring-boot-starter-websocket</artifactId>  
           </dependency>

    WebSocketConfig

    啟用WebSocket的支援也是很簡單,幾句程式碼搞定

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.socket.server.standard.ServerEndpointExporter;
    
    /**
     * 開啟WebSocket支援
     * @author zhengkai.blog.csdn.net
     */
    @Configuration  
    public class WebSocketConfig {  
        
        @Bean  
        public ServerEndpointExporter serverEndpointExporter() {  
            return new ServerEndpointExporter();  
        }  
      
    }

    WebSocketServer

    這就是重點了,核心都在這裡。

    1. 因為WebSocket是類似客戶端服務端的形式(採用ws協議),那麼這裡的WebSocketServer其實就相當於一個ws協議的Controller

    2. 直接@ServerEndpoint("/imserver/{userId}")@Component啟用即可,然後在裡面實現@OnOpen開啟連線,@onClose關閉連線,@onMessage接收訊息等方法。

    3. 新建一個ConcurrentHashMap webSocketMap 用於接收當前userId的WebSocket,方便IM之間對userId進行推送訊息。單機版實現到這裡就可以。

    4. 叢集版(多個ws節點)還需要藉助mysql或者redis等進行處理,改造對應的sendMessage方法即可。

    package com.softdev.system.demo.config;
    
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;
    import javax.websocket.OnClose;
    import javax.websocket.OnError;
    import javax.websocket.OnMessage;
    import javax.websocket.OnOpen;
    import javax.websocket.Session;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.springframework.stereotype.Component;
    import cn.hutool.log.Log;
    import cn.hutool.log.LogFactory;
    
    
    /**
     * @author zhengkai.blog.csdn.net
     */
    @ServerEndpoint("/imserver/{userId}")
    @Component
    public class WebSocketServer {
    
        static Log log=LogFactory.get(WebSocketServer.class);
        /**靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。*/
        private static int onlineCount = 0;
        /**concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。*/
        private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
        /**與某個客戶端的連線會話,需要通過它來給客戶端傳送資料*/
        private Session session;
        /**接收userId*/
        private String userId="";
    
        /**
         * 連線建立成功呼叫的方法*/
        @OnOpen
        public void onOpen(Session session,@PathParam("userId") String userId) {
            this.session = session;
            this.userId=userId;
            if(webSocketMap.containsKey(userId)){
                webSocketMap.remove(userId);
                webSocketMap.put(userId,this);
                //加入set中
            }else{
                webSocketMap.put(userId,this);
                //加入set中
                addOnlineCount();
                //線上數加1
            }
    
            log.info("使用者連線:"+userId+",當前線上人數為:" + getOnlineCount());
    
            try {
                sendMessage("連線成功");
            } catch (IOException e) {
                log.error("使用者:"+userId+",網路異常!!!!!!");
            }
        }
    
        /**
         * 連線關閉呼叫的方法
         */
        @OnClose
        public void onClose() {
            if(webSocketMap.containsKey(userId)){
                webSocketMap.remove(userId);
                //從set中刪除
                subOnlineCount();
            }
            log.info("使用者退出:"+userId+",當前線上人數為:" + getOnlineCount());
        }
    
        /**
         * 收到客戶端訊息後呼叫的方法
         *
         * @param message 客戶端傳送過來的訊息*/
        @OnMessage
        public void onMessage(String message, Session session) {
            log.info("使用者訊息:"+userId+",報文:"+message);
            //可以群發訊息
            //訊息儲存到資料庫、redis
            if(StringUtils.isNotBlank(message)){
                try {
                    //解析傳送的報文
                    JSONObject jsonObject = JSON.parseObject(message);
                    //追加發送人(防止串改)
                    jsonObject.put("fromUserId",this.userId);
                    String toUserId=jsonObject.getString("toUserId");
                    //傳送給對應toUserId使用者的websocket
                    if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
                        webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
                    }else{
                        log.error("請求的userId:"+toUserId+"不在該伺服器上");
                        //否則不在這個伺服器上,傳送到mysql或者redis
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        /**
         *
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            log.error("使用者錯誤:"+this.userId+",原因:"+error.getMessage());
            error.printStackTrace();
        }
        /**
         * 實現伺服器主動推送
         */
        public void sendMessage(String message) throws IOException {
            this.session.getBasicRemote().sendText(message);
        }
    
    
        /**
         * 傳送自定義訊息
         * */
        public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
            log.info("傳送訊息到:"+userId+",報文:"+message);
            if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
                webSocketMap.get(userId).sendMessage(message);
            }else{
                log.error("使用者"+userId+",不線上!");
            }
        }
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        public static synchronized void addOnlineCount() {
            WebSocketServer.onlineCount++;
        }
    
        public static synchronized void subOnlineCount() {
            WebSocketServer.onlineCount--;
        }
    }

    訊息推送

    至於推送新資訊,可以再自己的Controller寫個方法呼叫WebSocketServer.sendInfo();即可

    import com.softdev.system.demo.config.WebSocketServer;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.servlet.ModelAndView;
    import java.io.IOException;
    
    /**
     * WebSocketController
     * @author zhengkai.blog.csdn.net
     */
    @RestController
    public class DemoController {
    
        @GetMapping("index")
        public ResponseEntity<String> index(){
            return ResponseEntity.ok("請求成功");
        }
    
        @GetMapping("page")
        public ModelAndView page(){
            return new ModelAndView("websocket");
        }
    
        @RequestMapping("/push/{toUserId}")
        public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) throws IOException {
            WebSocketServer.sendInfo(message,toUserId);
            return ResponseEntity.ok("MSG SEND SUCCESS");
        }
    }

    頁面發起

    頁面用js程式碼呼叫websocket,當然,太古老的瀏覽器是不行的,一般新的瀏覽器或者谷歌瀏覽器是沒問題的。還有一點,記得協議是ws的,如果使用了一些路徑類,可以replace(“http”,“ws”)來替換協議。

    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8">
        <title>websocket通訊</title>
    </head>
    <script src="js/jquery-3.3.1.min.js" type="text/javascript"></script>
    <script>
        var socket;
        var contentText = "";
        function openSocket() {
            if(typeof(WebSocket) == "undefined") {
                console.log("您的瀏覽器不支援WebSocket");
            }else{
                console.log("您的瀏覽器支援WebSocket");
                //實現化WebSocket物件,指定要連線的伺服器地址與埠  建立連線
                //等同於socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
                //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
                var socketUrl="http://localhost:8082/3dserver/imserver/"+$("#userId").val();
                socketUrl=socketUrl.replace("https","ws").replace("http","ws");
                console.log(socketUrl);
                if(socket!=null){
                    socket.close();
                    socket=null;
                }
                socket = new WebSocket(socketUrl);
                //開啟事件
                socket.onopen = function() {
                    console.log("websocket已開啟");
                    alert("websocket已連線");
                    //socket.send("這是來自客戶端的訊息" + location.href + new Date());
                };
                //獲得訊息事件
                socket.onmessage = function(msg) {
                    console.log(msg.data);
                    contentText = contentText + $.parseJSON(msg.data).contentText + "\n";
                    //發現訊息進入    開始處理前端觸發邏輯
                    $("#textareaId").val(contentText);
                };
                //關閉事件
                socket.onclose = function() {
                    console.log("websocket已關閉");
                };
                //發生了錯誤事件
                socket.onerror = function() {
                    console.log("websocket發生了錯誤");
                }
            }
        }
        function sendMessage() {
            if(typeof(WebSocket) == "undefined") {
                console.log("您的瀏覽器不支援WebSocket");
            }else {
                console.log("您的瀏覽器支援WebSocket");
                console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
                socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
            }
        }
    </script>
    <body>
        <p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
        <p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
        <p>【傳送內容】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
        <p><textarea id="textareaId" rows="10" cols="30" ></textarea></p>
        <p>【操作】:<input type="submit" onclick="openSocket()" value="開啟socket"></p>
        <p>【操作】:<input type="submit" onclick="sendMessage()" value="傳送訊息"></p>
    </body>
    
    </html>

    執行效果

    • v20200105,加入開源專案spring-cloud-study-websocket,更新執行效果,更方便理解。
    • v1.1的效果,剛剛修復了日誌,並且支援指定監聽某個埠,程式碼已經全部更新,現在是這樣的效果
    1. 開啟兩個頁面,按F12調出控控制檯檢視測試效果:


    分別開啟socket,再傳送訊息

    向前端推送資料:


    通過呼叫push api,可以向指定的userId推送資訊,當然報文這裡亂寫,建議規定好格式。

    後續

    針對簡單IM的業務場景,進行了一些優化,可以看後續的文章SpringBoot2+WebSocket之聊天應用實戰(優化版本)(v20201005已整合)

    主要變動是CopyOnWriteArraySet改為ConcurrentHashMap,保證多執行緒安全同時方便利用map.get(userId)進行推送到指定埠。

    相比之前的Set,Set遍歷是費事且麻煩的事情,而Map的get是簡單便捷的,當WebSocket數量大的時候,這個小小的消耗就會聚少成多,影響體驗,所以需要優化。在IM的場景下,指定userId進行推送訊息更加方便。

    Websocker注入Bean問題

    關於這個問題,可以看最新發表的這篇文章,在參考和研究了網上一些攻略後,專案已經通過該方法注入成功,大家可以參考。
    關於controller呼叫controller/service呼叫service/util呼叫service/websocket中autowired的解決方法

    netty-websocket-spring-boot-starter

    Springboot2構建基於Netty的高效能Websocket伺服器(netty-websocket-spring-boot-starter)
    只需要換個starter即可實現高效能websocket,趕緊使用吧

    Springboot2+Netty+Websocket

    Springboot2+Netty實現Websocket,使用官方的netty-all的包,比原生的websocket更加穩定更加高效能,同等配置情況下可以handle更多的連線。

    程式碼樣式全部已經更正,也支援websocket連線url帶引數功能,另外也感謝大家的閱讀和評論,一起進步,謝謝!~~

    ServerEndpointExporter錯誤

    org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘serverEndpointExporter’ defined in class path resource [com/xxx/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

    感謝@來了老弟兒 的反饋:

    如果tomcat部署一直報這個錯,請移除 WebSocketConfig@Bean ServerEndpointExporter 的注入 。

    ServerEndpointExporter 是由Spring官方提供的標準實現,用於掃描ServerEndpointConfig配置類和@ServerEndpoint註解例項。使用規則也很簡單:

    1. 如果使用預設的嵌入式容器 比如Tomcat 則必須手工在上下文提供ServerEndpointExporter
    2. 如果使用外部容器部署war包,則不需要提供提供ServerEndpointExporter,因為此時SpringBoot預設將掃描服務端的行為交給外部容器處理,所以線上部署的時候要把WebSocketConfig中這段注入bean的程式碼注掉。

    正式專案的前端WebSocket框架 GoEasy

    感謝kkatrina的補充,正式的專案中,一般是用第三方websocket框架來做,穩定性、實時性有保證的多,也會包括一些心跳、重連機制。

    GoEasy專注於伺服器與瀏覽器,瀏覽器與瀏覽器之間訊息推送,完美相容世界上的絕大多數瀏覽器,包括IE6, IE7之類的非常古老的瀏覽器。支援Uniapp,各種小程式,react,vue等所有主流Web前端技術。
    GoEasy採用 釋出/訂閱 的訊息模式,幫助您非常輕鬆的實現一對一,一對多的通訊。
    https://www.goeasy.io/cn/doc/

    @Component@ServerEndpoint關於是否單例模式,能否使用static Map等一些問題的解答

    看到大家都在熱心的討論關於是否單例模式這個問題,請大家相信自己的直接,如果websocket是單例模式,還怎麼服務這麼多session呢。

    1. websocket是原型模式@ServerEndpoint每次建立雙向通訊的時候都會建立一個例項,區別於spring的單例模式。
    2. Spring的@Component預設是單例模式,請注意,預設 而已,是可以被改變的。
    3. 這裡的@Component僅僅為了支援@Autowired依賴注入使用,如果不加則不能注入任何東西,為了方便。
    4. 什麼是prototype 原型模式? 基本就是你需要從A的例項得到一份與A內容相同,但是又互不干擾的例項B的話,就需要使用原型模式。
    5. 關於在原型模式下使用static 的webSocketMap,請注意這是ConcurrentHashMap ,也就是執行緒安全/執行緒同步的,而且已經是靜態變數作為全域性呼叫,這種情況下是ok的,或者大家如果有顧慮或者更好的想法的化,可以進行改進。 例如使用一箇中間類來接收和存放session。
    6. 為什麼每次都@OnOpen都要檢查webSocketMap.containsKey(userId) ,首先了為了程式碼強壯性考慮,假設程式碼以及機制沒有問題,那麼肯定這個邏輯是廢的對吧。但是實際使用的時候發現偶爾會出現重連失敗或者其他原因導致之前的session還存在,這裡就做了一個清除舊session,迎接新session的功能。

    Vue版本的websocket連線

    感謝**@GzrStudy**的貢獻,供大家參考。

    <script>
    export default {
        data() {
            return {
                socket:null,
                userId:localStorage.getItem("ms_uuid"),
                toUserId:'2',
                content:'3'
            }
        },
      methods: {
        openSocket() {
          if (typeof WebSocket == "undefined") {
            console.log("您的瀏覽器不支援WebSocket");
          } else {
            console.log("您的瀏覽器支援WebSocket");
            //實現化WebSocket物件,指定要連線的伺服器地址與埠  建立連線
            //等同於socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
            //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
            var socketUrl =
              "http://localhost:8081/imserver/" + this.userId;
            socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
            console.log(socketUrl);
            if (this.socket != null) {
              this.socket.close();
              this.socket = null;
            }
            this.socket = new WebSocket(socketUrl);
            //開啟事件
            this.socket = new WebSocket(socketUrl);
            //開啟事件
            this.socket.onopen = function() {
              console.log("websocket已開啟");
              //socket.send("這是來自客戶端的訊息" + location.href + new Date());
            };
            //獲得訊息事件
            this.socket.onmessage = function(msg) {
              console.log(msg.data);
              //發現訊息進入    開始處理前端觸發邏輯
            };
            //關閉事件
            this.socket.onclose = function() {
              console.log("websocket已關閉");
            };
            //發生了錯誤事件
            this.socket.onerror = function() {
              console.log("websocket發生了錯誤");
            };
          }
        },
        sendMessage() {
          if (typeof WebSocket == "undefined") {
            console.log("您的瀏覽器不支援WebSocket");
          } else {
            console.log("您的瀏覽器支援WebSocket");
            console.log(
              '{"toUserId":"' +
                 this.toUserId +
                '","contentText":"' +
                 this.content +
                '"}'
            );
            this.socket.send(
              '{"toUserId":"' +
                 this.toUserId +
                '","contentText":"' +
                 this.content +
                '"}'
             );
        
        }
    }