1. 程式人生 > 實用技巧 >【springcloud】【轉載】基於redis訊息訂閱和websocket實現的訊息推送

【springcloud】【轉載】基於redis訊息訂閱和websocket實現的訊息推送

基於redis訊息訂閱和websocket技術實現的訊息推送

本文【轉載】自:https://my.oschina.net/freide/blog/2991435

依賴檔案

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    </dependencies>

建立Redis訊息監聽者容器

@Configuration
public class RedisConfig {

    /**
     * 建立訊息監聽器
     * @param factory
     * @return
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        
return container; } }

建立Websocket配置類

import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
* 這個配置類的作用是要注入ServerEndpointExporter,這個bean會自動註冊使用了@ServerEndpoint註解宣告的Websocket endpoint。
* 如果是使用獨立的servlet容器,而不是直接使用springboot的內建容器,就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理。

*/ @Component
public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

建立訊息訂閱監聽者類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import javax.websocket.Session;
import java.io.IOException;

/**
 * redis訊息訂閱監聽者
 */
@Component
public class RedisSubscribeListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    //webSocket客戶端會話物件
    private Session session;

    /**
     * 接收發布者訊息
     * @param message
     * @param bytes
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        String msg = new String(message.getBody());
        logger.info("[{}]主題釋出:{}", new String(bytes), msg);
        if (session != null && session.isOpen()) {
            try {
                session.getBasicRemote().sendText(msg);
            } catch (IOException e) {
                logger.error("[redis監聽器]釋出訊息異常:{}", e);
            }
        }
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }
}

這個訊息訂閱監聽者類持有websocket的客戶端會話物件(session),當接收到訂閱的訊息時,通過這個會話物件(session)將訊息傳送到前端,從而實現訊息的主動推送。

建立Websocket服務端類

@Component

@ServerEndpoint("/websocket/server")

public class WebSocketServer {

    /**

     * 因為@ServerEndpoint不支援注入,所以使用SpringUtils獲取IOC例項

     */

    private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class);

    //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。

     private static  AtomicInteger onlineCount=new AtomicInteger(0);

     //concurrent包的執行緒安全Set,用來存放每個客戶端對應的webSocket物件。若要實現服務端與單一客戶端通訊的話,可以使用Map來存放,其中Key可以為使用者標識

     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();

     //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料

     private Session session;

     private SubscribeListener subscribeListener;

    /**

     * 連線建立成功呼叫的方法

     * @param session  可選的引數。session為與某個客戶端的連線會話,需要通過它來給客戶端傳送資料

     */

    @OnOpen

    public void onOpen(Session session){

        this.session = session;

        webSocketSet.add(this);     //加入set中

        addOnlineCount();           //線上數加1

        System.out.println("有新連線加入!當前線上人數為" + getOnlineCount());

        subscribeListener = new SubscribeListener();

        subscribeListener.setSession(session);

        //設定訂閱topic

        redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("TOPIC"));

    }

    /**

     * 連線關閉呼叫的方法

     */

    @OnClose

    public void onClose() throws IOException {

        webSocketSet.remove(this);  //從set中刪除

        subOnlineCount();           //線上數減1

        redisMessageListenerContainer.removeMessageListener(subscribeListener);

        System.out.println("有一連線關閉!當前線上人數為" + getOnlineCount());

    }



    /**

     * 收到客戶端訊息後呼叫的方法

     * @param message 客戶端傳送過來的訊息

     * @param session 可選的引數

     */

    @OnMessage

    public void onMessage(String message, Session session) {

        System.out.println("來自客戶端的訊息:" + message);

        //群發訊息

        for(WebSocketServer item: webSocketSet){

            try {

                item.sendMessage(message);

            } catch (IOException e) {

                e.printStackTrace();

                continue;

            }

        }

    }



    /**

     * 發生錯誤時呼叫

     * @param session

     * @param error

     */

    @OnError

    public void onError(Session session, Throwable error){

        System.out.println("發生錯誤");

        error.printStackTrace();

    }



    /**

     * 這個方法與上面幾個方法不一樣。沒有用註解,是根據自己需要新增的方法。

     * @param message

     * @throws IOException

     */

    public void sendMessage(String message) throws IOException {

        this.session.getBasicRemote().sendText(message);

    }



    public   int getOnlineCount() {

        return onlineCount.get();

    }



    public   void addOnlineCount() {

        WebSocketServer.onlineCount.getAndIncrement();

    }



    public   void subOnlineCount() {

        WebSocketServer.onlineCount.getAndDecrement();

    }

}
@ServerEndpoint 註解是一個類層次的註解,它的功能主要是將目前的類定義成一個websocket伺服器端,註解的值將被用於監聽使用者連線的終端訪問URL地址,
客戶端可以通過這個URL來連線到WebSocket伺服器端使用springboot的唯一區別是要@Component宣告下,而使用獨立容器是由容器自己管理websocket的,
但在springboot中連容器都是spring管理的。

 雖然@Component預設是單例模式的,但springboot還是會為每個websocket連線初始化一個bean,所以可以用一個靜態set儲存起來。


注意的是在客戶端連結關閉的方法onClose中,一定要 刪除之前的訂閱監聽物件,就是下面這行程式碼:
redisMessageListenerContainer.removeMessageListener(subscribeListener);
 否則在瀏覽器刷一下之後,後臺會報如下錯誤:
java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close())
may be called on a closed session
原因就是當連結關閉之後,session物件就沒有了,而訂閱者物件還是會接收訊息,在用session物件傳送訊息時會報錯。
雖然程式碼中加了判斷 if(null != session && session.isOpen()) { 可以避免報錯,但是為了防止記憶體洩漏,應該把沒有用的監聽者物件從容器中刪除。

建立前端頁面

在resource\static目錄下建立html頁面,命名為websocket.html。程式碼如下:

 <!doctype html>

<html xmlns:th="http://www.thymeleaf.org">

<head>

    <meta charset="utf-8"></meta>

    <title>websocket</title>

</head>

<h4>

使用redis訂閱訊息和websocket實現訊息推送

</h4>

<br/>

<h5>收到的訂閱訊息:</h5>

<div id="message_id"></div>

</body>

<script type="text/javascript">

    var websocket = null;

    //當前瀏覽前是否支援websocket

    if("WebSocket" in window){

        var url = "ws://localhost:8080/demo/websocket/server";

        websocket = new WebSocket(url);

    }else{

        alert("瀏覽器不支援websocket");

    }



    websocket.onopen = function(event){

        setMessage("開啟連線");

    }



    websocket.onclose = function(event){

        setMessage("關閉連線");

    }



    websocket.onmessage = function(event){

        setMessage(event.data);

    }



    websocket.onerror = function(event){

        setMessage("連線異常");

    }



    //監聽視窗關閉事件,當視窗關閉時,主動去關閉websocket連線,防止連線還沒斷開就關閉視窗,server端會拋異常。

    window.onbeforeunload = function(){

        closeWebsocket();

    }



    //關閉websocket

    function closeWebsocket(){

        //3代表已經關閉

        if(3!=websocket.readyState){

            websocket.close();

        }else{

            alert("websocket之前已經關閉");

        }

    }

    //將訊息顯示在網頁上

    function setMessage(message){

        document.getElementById('message_id').innerHTML += message + '<br/>';

    }

</script>

</html>
View Code

啟動服務進行測試

  1. 啟動springboot服務,瀏覽器輸入地址:http://localhost:8080/demo/websocket.html,此時頁面顯示如下

2.開啟redis客戶端,在命令列輸入publish TOPIC “this is test message”

瀏覽器頁面顯示如下:

說明剛剛釋出的訊息已經主動推送到瀏覽器顯示了。

完整程式碼見: https://gitee.com/freide/springboot

【轉載】自 https://my.oschina.net/freide/blog/2991435