1. 程式人生 > 實用技巧 >整合 websocket 的四種方案(轉)

整合 websocket 的四種方案(轉)

整合 websocket 的四種方案

文章完全轉載自:https://www.cnblogs.com/kiwifly/p/11729304.html

1. 原生註解

pom.xml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

WebSocketConfig

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 
*/ package cn.coder4j.study.example.websocket.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/** * @author buhao * @version WebSocketConfig.java, v 0.1 2019-10-18 15:45 buhao */ @Configuration @EnableWebSocket public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpoint() { return new ServerEndpointExporter(); } }

說明:

這個配置類很簡單,通過這個配置 spring boot 才能去掃描後面的關於 websocket 的註解

WsServerEndpoint

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.ws;

import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author buhao
 * @version WsServerEndpoint.java, v 0.1 2019-10-18 16:06 buhao
 */
@ServerEndpoint("/myWs")
@Component
public class WsServerEndpoint {

    /**
     * 連線成功
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("連線成功");
    }

    /**
     * 連線關閉
     *
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        System.out.println("連線關閉");
    }

    /**
     * 接收到訊息
     *
     * @param text
     */
    @OnMessage
    public String onMsg(String text) throws IOException {
        return "servet 傳送:" + text;
    }
}

說明

這裡有幾個註解需要注意一下,首先是他們的包都在 **javax.websocket **下。並不是 spring 提供的,而 jdk 自帶的,下面是他們的具體作用。

  1. @ServerEndpoint
  2. 通過這個 spring boot 就可以知道你暴露出去的 ws 應用的路徑,有點類似我們經常用的@RequestMapping。比如你的啟動埠是8080,而這個註解的值是ws,那我們就可以通過 ws://127.0.0.1:8080/ws 來連線你的應用
  3. @OnOpen
  4. 當 websocket 建立連線成功後會觸發這個註解修飾的方法,注意它有一個Session 引數
  5. @OnClose
  6. 當 websocket 建立的連線斷開後會觸發這個註解修飾的方法,注意它有一個Session 引數
  7. @OnMessage
  8. 當客戶端傳送訊息到服務端時,會觸發這個註解修改的方法,它有一個 String 入參表明客戶端傳入的值
  9. @OnError
  10. 當 websocket 建立連線時出現異常會觸發這個註解修飾的方法,注意它有一個Session 引數

另外一點就是服務端如何傳送訊息給客戶端,服務端傳送訊息必須通過上面說的 Session 類,通常是在@OnOpen 方法中,當連線成功後把 session 存入 Map 的 value,key 是與 session 對應的使用者標識,當要傳送的時候通過 key 獲得 session 再發送,這裡可以通過session.getBasicRemote_().sendText(_)來對客戶端傳送訊息。

2. Spring封裝

pom.xml

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

HttpAuthHandler

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.handler;

import cn.coder4j.study.example.websocket.config.WsSessionManager;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.time.LocalDateTime;

/**
 * @author buhao
 * @version MyWSHandler.java, v 0.1 2019-10-17 17:10 buhao
 */
@Component
public class HttpAuthHandler extends TextWebSocketHandler {

    /**
     * socket 建立成功事件
     *
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 使用者連線成功,放入線上使用者快取
            WsSessionManager.add(token.toString(), session);
        } else {
            throw new RuntimeException("使用者登入已經失效!");
        }
    }

    /**
     * 接收訊息事件
     *
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 獲得客戶端傳來的訊息
        String payload = message.getPayload();
        Object token = session.getAttributes().get("token");
        System.out.println("server 接收到 " + token + " 傳送的 " + payload);
        session.sendMessage(new TextMessage("server 傳送給 " + token + " 訊息 " + payload + " " + LocalDateTime.now().toString()));
    }

    /**
     * socket 斷開連線時
     *
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        Object token = session.getAttributes().get("token");
        if (token != null) {
            // 使用者退出,移除快取
            WsSessionManager.remove(token.toString());
        }
    }

}

說明

通過繼承TextWebSocketHandler類並覆蓋相應方法,可以對 websocket 的事件進行處理,這裡可以同原生註解的那幾個註解連起來看

  1. afterConnectionEstablished方法是在 socket 連線成功後被觸發,同原生註解裡的 @OnOpen 功能
  2. **afterConnectionClosed **方法是在 socket 連線關閉後被觸發,同原生註解裡的 @OnClose 功能
  3. **handleTextMessage **方法是在客戶端傳送資訊時觸發,同原生註解裡的@OnMessage 功能

WsSessionManager

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author buhao
 * @version WsSessionManager.java, v 0.1 2019-10-22 10:24 buhao
 */
@Slf4j
public class WsSessionManager {
    /**
     * 儲存連線 session 的地方
     */
    private static ConcurrentHashMap<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();

    /**
     * 新增 session
     *
     * @param key
     */
    public static void add(String key, WebSocketSession session) {
        // 新增 session
        SESSION_POOL.put(key, session);
    }

    /**
     * 刪除 session,會返回刪除的 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession remove(String key) {
        // 刪除 session
        return SESSION_POOL.remove(key);
    }

    /**
     * 刪除並同步關閉連線
     *
     * @param key
     */
    public static void removeAndClose(String key) {
        WebSocketSession session = remove(key);
        if (session != null) {
            try {
                // 關閉連線
                session.close();
            } catch (IOException e) {
                // todo: 關閉出現異常處理
                e.printStackTrace();
            }
        }
    }

    /**
     * 獲得 session
     *
     * @param key
     * @return
     */
    public static WebSocketSession get(String key) {
        // 獲得 session
        return SESSION_POOL.get(key);
    }
}

說明

這裡簡單通過**ConcurrentHashMap **來實現了一個 session 池,用來儲存已經登入的 web socket 的 session。前文提過,服務端傳送訊息給客戶端必須要通過這個 session。

MyInterceptor

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.interceptor;

import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import java.util.HashMap;
import java.util.Map;

/**
 * @author buhao
 * @version MyInterceptor.java, v 0.1 2019-10-17 19:21 buhao
 */
@Component
public class MyInterceptor implements HandshakeInterceptor {

    /**
     * 握手前
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param attributes
     * @return
     * @throws Exception
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        System.out.println("握手開始");
        // 獲得請求引數
        HashMap<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), "utf-8");
        String uid = paramMap.get("token");
        if (StrUtil.isNotBlank(uid)) {
            // 放入屬性域
            attributes.put("token", uid);
            System.out.println("使用者 token " + uid + " 握手成功!");
            return true;
        }
        System.out.println("使用者登入已失效");
        return false;
    }

    /**
     * 握手後
     *
     * @param request
     * @param response
     * @param wsHandler
     * @param exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        System.out.println("握手完成");
    }

}

說明

通過實現HandshakeInterceptor介面來定義握手攔截器,注意這裡與上面Handler的事件是不同的,這裡是建立握手時的事件,分為握手前與握手後,而Handler的事件是在握手成功後的基礎上建立 socket 的連線。所以在如果把認證放在這個步驟相對來說最節省伺服器資源。它主要有兩個方法beforeHandshake與**afterHandshake **,顧名思義一個在握手前觸發,一個在握手後觸發。

WebSocketConfig

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import cn.coder4j.study.example.websocket.handler.HttpAuthHandler;
import cn.coder4j.study.example.websocket.interceptor.MyInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-17 15:43 buhao
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private HttpAuthHandler httpAuthHandler;
    @Autowired
    private MyInterceptor myInterceptor;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry
                .addHandler(httpAuthHandler, "myWS")
                .addInterceptors(myInterceptor)
                .setAllowedOrigins("*");
    }
}

說明

通過實現WebSocketConfigurer類並覆蓋相應的方法進行websocket的配置。我們主要覆蓋registerWebSocketHandlers這個方法。通過向WebSocketHandlerRegistry設定不同引數來進行配置。其中 **addHandler方法新增我們上面的寫的 ws 的 handler 處理類,第二個引數是你暴露出的 ws 路徑。addInterceptors新增我們寫的握手過濾器。setAllowedOrigins("*") **這個是關閉跨域校驗,方便本地除錯,線上推薦開啟。

3. TIO

pom.xml

 <dependency>
     <groupId>org.t-io</groupId>
     <artifactId>tio-websocket-spring-boot-starter</artifactId>
     <version>3.5.5.v20191010-RELEASE</version>
</dependency>

application.xml

tio:
  websocket:
    server:
      port: 8989

說明

這裡只配置了 ws 的啟動埠,還有很多配置,可以通過結尾給的連結去尋找

MyHandler

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.handler;

import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.server.handler.IWsMsgHandler;

/**
 * @author buhao
 * @version MyHandler.java, v 0.1 2019-10-21 14:39 buhao
 */
@Component
public class MyHandler implements IWsMsgHandler {
    /**
     * 握手
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public HttpResponse handshake(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        return httpResponse;
    }

    /**
     * 握手成功
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @throws Exception
     */
    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        System.out.println("握手成功");
    }

    /**
     * 接收二進位制檔案
     *
     * @param wsRequest
     * @param bytes
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        return null;
    }

    /**
     * 斷開連線
     *
     * @param wsRequest
     * @param bytes
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
        System.out.println("關閉連線");
        return null;
    }

    /**
     * 接收訊息
     *
     * @param wsRequest
     * @param s
     * @param channelContext
     * @return
     * @throws Exception
     */
    @Override
    public Object onText(WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception {
        System.out.println("接收文字訊息:" + s);
        return "success";
    }
}

說明

這個同上個例子中的 handler 很像,也是通過實現介面覆蓋方法來進行事件處理,實現的介面是IWsMsgHandler,它的方法功能如下

  1. handshake
  2. 在握手的時候觸發
  3. onAfterHandshaked
  4. 在握手成功後觸發
  5. onBytes
  6. 客戶端傳送二進位制訊息觸發
  7. onClose
  8. 客戶端關閉連線時觸發
  9. onText
  10. 客戶端傳送文字訊息觸發

StudyWebsocketExampleApplication

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */

package cn.coder4j.study.example.websocket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.tio.websocket.starter.EnableTioWebSocketServer;

@SpringBootApplication
@EnableTioWebSocketServer
public class StudyWebsocketExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(StudyWebsocketExampleApplication.class, args);
    }
}

說明

這個類的名稱不重要,它其實是你的 spring boot 啟動類,只要記得加上@EnableTioWebSocketServer註解就可以了

4.STOMP

pom.xml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

WebSocketConfig

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @author buhao
 * @version WebSocketConfig.java, v 0.1 2019-10-21 16:32 buhao
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 配置客戶端嘗試連線地址
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 設定廣播節點
        registry.enableSimpleBroker("/topic", "/user");
        // 客戶端向服務端傳送訊息需有/app 字首
        registry.setApplicationDestinationPrefixes("/app");
        // 指定使用者傳送(一對一)的字首 /user/
        registry.setUserDestinationPrefix("/user/");
    }
}

說明

  1. 通過實現WebSocketMessageBrokerConfigurer介面和加上@EnableWebSocketMessageBroker來進行 stomp 的配置與註解掃描。
  2. 其中覆蓋registerStompEndpoints方法來設定暴露的 stomp 的路徑,其它一些跨域、客戶端之類的設定。
  3. 覆蓋**configureMessageBroker **方法來進行節點的配置。
  4. 其中**enableSimpleBroker **配置的廣播節點,也就是服務端傳送訊息,客戶端訂閱就能接收訊息的節點。
  5. 覆蓋**setApplicationDestinationPrefixes **方法,設定客戶端向服務端傳送訊息的節點。
  6. 覆蓋setUserDestinationPrefix方法,設定一對一通訊的節點。

WSController

/*
 * *
 *  * blog.coder4j.cn
 *  * Copyright (C) 2016-2019 All Rights Reserved.
 *
 */
package cn.coder4j.study.example.websocket.controller;

import cn.coder4j.study.example.websocket.model.RequestMessage;
import cn.coder4j.study.example.websocket.model.ResponseMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @author buhao
 * @version WSController.java, v 0.1 2019-10-21 17:22 buhao
 */
@Controller
public class WSController {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/hello")
    @SendTo("/topic/hello")
    public ResponseMessage hello(RequestMessage requestMessage) {
        System.out.println("接收訊息:" + requestMessage);
        return new ResponseMessage("服務端接收到你發的:" + requestMessage);
    }

    @GetMapping("/sendMsgByUser")
    public @ResponseBody
    Object sendMsgByUser(String token, String msg) {
        simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg);
        return "success";
    }

    @GetMapping("/sendMsgByAll")
    public @ResponseBody
    Object sendMsgByAll(String msg) {
        simpMessagingTemplate.convertAndSend("/topic", msg);
        return "success";
    }

    @GetMapping("/test")
    public String test() {
        return "test-stomp.html";
    }
}

說明

  1. 通過@MessageMapping來暴露節點路徑,有點類似@RequestMapping。注意這裡雖然寫的是 hello ,但是我們客戶端呼叫的真正地址是** /app/hello。 因為我們在上面的 config 裡配置了registry.setApplicationDestinationPrefixes("/app")**。
  2. @SendTo這個註解會把返回值的內容傳送給訂閱了/topic/hello的客戶端,與之類似的還有一個@SendToUser只不過他是傳送給使用者端一對一通訊的。這兩個註解一般是應答時響應的,如果服務端主動傳送訊息可以通過simpMessagingTemplate類的convertAndSend方法。注意simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg),聯絡到我們上文配置的registry.setUserDestinationPrefix("/user/"),這裡客戶端訂閱的是/user/{token}/msg,千萬不要搞錯。

Session 共享的問題

上面反覆提到一個問題就是,服務端如果要主動傳送訊息給客戶端一定要用到 session。而大家都知道的是 session 這個東西是不跨 jvm 的。如果有多臺伺服器,在 http 請求的情況下,我們可以通過把 session 放入快取中介軟體中來共享解決這個問題,通過 spring session 幾條配置就解決了。但是 web socket 不可以。他的 session 是不能序列化的,當然這樣設計的目的不是為了為難你,而是出於對 http 與 web socket 請求的差異導致的。
目前網上找到的最簡單方案就是通過 redis 訂閱廣播的形式,主要程式碼跟第二種方式差不多,你要在本地放個 map 儲存請求的 session。也就是說每臺伺服器都會儲存與他連線的 session 於本地。然後發訊息的地方要修改,並不是現在這樣直接傳送,而通過 redis 的訂閱機制。伺服器要發訊息的時候,你通過 redis 廣播這條訊息,所有訂閱的服務端都會收到這個訊息,然後本地嘗試傳送。最後肯定只有有這個對應使用者 session 的那臺才能傳送出去。

如何選擇

  1. 如果你在使用 tio,那推薦使用 tio 的整合。因為它已經實現了很多功能,包括上面說的通過 redis 的 session 共享,只要加幾個配置就可以了。但是 tio 是半開源,文件是需要收費的。如果沒有使用,那就忘了他。
  2. 如果你的業務要求比較靈活多變,推薦使用前兩種,更推薦第二種 Spring 封裝的形式。
  3. 如果只是簡單的伺服器雙向通訊,推薦 stomp 的形式,因為他更容易規範使用。

其它

  1. websocket 線上驗證

寫完服務端程式碼後想除錯,但是不會前端程式碼怎麼辦,點這裡,這是一個線上的 websocket 客戶端,功能完全夠我們除錯了。

  1. stomp 驗證

這個沒找到線上版的,但是網上有很多 demo 可以下載到本地進行除錯,也可以通過後文的連線找到。

  1. 另外由於篇幅有限,並不能放上所有程式碼,但是測試程式碼全都上傳 gitlab,保證可以正常執行,可以在這裡找到

參考連結

    1. SpringBoot 系統 - 整合 WebSocket 實時通訊
    2. WebSocket 的故事(二)—— Spring 中如何利用 STOMP 快速構建 WebSocket 廣播式訊息模式
    3. SpringBoot整合WebSocket【基於純H5】進行點對點[一對一]和廣播[一對多]實時推送
    4. Spring Framework 參考文件(WebSocket STOMP)
    5. Spring Boot中使用WebSocket總結(一):幾種實現方式詳解
    6. Spring Boot 系列 - WebSocket 簡單使用
    7. tio-websocket-spring-boot-starter