【springcloud】【轉載】基於redis訊息訂閱和websocket實現的訊息推送
阿新 • • 發佈:2020-09-09
基於redis訊息訂閱和websocket技術實現的訊息推送
本文【轉載】自:https://my.oschina.net/freide/blog/2991435
依賴檔案
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> </dependencies>
建立Redis訊息監聽者容器
@Configuration public class RedisConfig { /** * 建立訊息監聽器 * @param factory * @return */ @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory);return container; } }
建立Websocket配置類
import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* 這個配置類的作用是要注入ServerEndpointExporter,這個bean會自動註冊使用了@ServerEndpoint註解宣告的Websocket endpoint。* 如果是使用獨立的servlet容器,而不是直接使用springboot的內建容器,就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理。
*/ @Component public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
建立訊息訂閱監聽者類
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import javax.websocket.Session; import java.io.IOException; /** * redis訊息訂閱監聽者 */ @Component public class RedisSubscribeListener implements MessageListener { private Logger logger = LoggerFactory.getLogger(this.getClass()); //webSocket客戶端會話物件 private Session session; /** * 接收發布者訊息 * @param message * @param bytes */ @Override public void onMessage(Message message, byte[] bytes) { String msg = new String(message.getBody()); logger.info("[{}]主題釋出:{}", new String(bytes), msg); if (session != null && session.isOpen()) { try { session.getBasicRemote().sendText(msg); } catch (IOException e) { logger.error("[redis監聽器]釋出訊息異常:{}", e); } } } public Session getSession() { return session; } public void setSession(Session session) { this.session = session; } }
這個訊息訂閱監聽者類持有websocket的客戶端會話物件(session),當接收到訂閱的訊息時,通過這個會話物件(session)將訊息傳送到前端,從而實現訊息的主動推送。
建立Websocket服務端類
@Component @ServerEndpoint("/websocket/server") public class WebSocketServer { /** * 因為@ServerEndpoint不支援注入,所以使用SpringUtils獲取IOC例項 */ private RedisMessageListenerContainer redisMessageListenerContainer = SpringUtils.getBean(RedisMessageListenerContainer.class); //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。 private static AtomicInteger onlineCount=new AtomicInteger(0); //concurrent包的執行緒安全Set,用來存放每個客戶端對應的webSocket物件。若要實現服務端與單一客戶端通訊的話,可以使用Map來存放,其中Key可以為使用者標識 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料 private Session session; private SubscribeListener subscribeListener; /** * 連線建立成功呼叫的方法 * @param session 可選的引數。session為與某個客戶端的連線會話,需要通過它來給客戶端傳送資料 */ @OnOpen public void onOpen(Session session){ this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //線上數加1 System.out.println("有新連線加入!當前線上人數為" + getOnlineCount()); subscribeListener = new SubscribeListener(); subscribeListener.setSession(session); //設定訂閱topic redisMessageListenerContainer.addMessageListener(subscribeListener, new ChannelTopic("TOPIC")); } /** * 連線關閉呼叫的方法 */ @OnClose public void onClose() throws IOException { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //線上數減1 redisMessageListenerContainer.removeMessageListener(subscribeListener); System.out.println("有一連線關閉!當前線上人數為" + getOnlineCount()); } /** * 收到客戶端訊息後呼叫的方法 * @param message 客戶端傳送過來的訊息 * @param session 可選的引數 */ @OnMessage public void onMessage(String message, Session session) { System.out.println("來自客戶端的訊息:" + message); //群發訊息 for(WebSocketServer item: webSocketSet){ try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); continue; } } } /** * 發生錯誤時呼叫 * @param session * @param error */ @OnError public void onError(Session session, Throwable error){ System.out.println("發生錯誤"); error.printStackTrace(); } /** * 這個方法與上面幾個方法不一樣。沒有用註解,是根據自己需要新增的方法。 * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } public int getOnlineCount() { return onlineCount.get(); } public void addOnlineCount() { WebSocketServer.onlineCount.getAndIncrement(); } public void subOnlineCount() { WebSocketServer.onlineCount.getAndDecrement(); } }
@ServerEndpoint 註解是一個類層次的註解,它的功能主要是將目前的類定義成一個websocket伺服器端,註解的值將被用於監聽使用者連線的終端訪問URL地址,
客戶端可以通過這個URL來連線到WebSocket伺服器端使用springboot的唯一區別是要@Component宣告下,而使用獨立容器是由容器自己管理websocket的,
但在springboot中連容器都是spring管理的。
雖然@Component預設是單例模式的,但springboot還是會為每個websocket連線初始化一個bean,所以可以用一個靜態set儲存起來。
注意的是在客戶端連結關閉的方法onClose中,一定要 刪除之前的訂閱監聽物件,就是下面這行程式碼:
redisMessageListenerContainer.removeMessageListener(subscribeListener);
否則在瀏覽器刷一下之後,後臺會報如下錯誤:
java.lang.IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close())
may be called on a closed session
原因就是當連結關閉之後,session物件就沒有了,而訂閱者物件還是會接收訊息,在用session物件傳送訊息時會報錯。
雖然程式碼中加了判斷 if(null != session && session.isOpen()) { 可以避免報錯,但是為了防止記憶體洩漏,應該把沒有用的監聽者物件從容器中刪除。
建立前端頁面
在resource\static目錄下建立html頁面,命名為websocket.html。程式碼如下:
<!doctype html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="utf-8"></meta> <title>websocket</title> </head> <h4> 使用redis訂閱訊息和websocket實現訊息推送 </h4> <br/> <h5>收到的訂閱訊息:</h5> <div id="message_id"></div> </body> <script type="text/javascript"> var websocket = null; //當前瀏覽前是否支援websocket if("WebSocket" in window){ var url = "ws://localhost:8080/demo/websocket/server"; websocket = new WebSocket(url); }else{ alert("瀏覽器不支援websocket"); } websocket.onopen = function(event){ setMessage("開啟連線"); } websocket.onclose = function(event){ setMessage("關閉連線"); } websocket.onmessage = function(event){ setMessage(event.data); } websocket.onerror = function(event){ setMessage("連線異常"); } //監聽視窗關閉事件,當視窗關閉時,主動去關閉websocket連線,防止連線還沒斷開就關閉視窗,server端會拋異常。 window.onbeforeunload = function(){ closeWebsocket(); } //關閉websocket function closeWebsocket(){ //3代表已經關閉 if(3!=websocket.readyState){ websocket.close(); }else{ alert("websocket之前已經關閉"); } } //將訊息顯示在網頁上 function setMessage(message){ document.getElementById('message_id').innerHTML += message + '<br/>'; } </script> </html>View Code
啟動服務進行測試
- 啟動springboot服務,瀏覽器輸入地址:http://localhost:8080/demo/websocket.html,此時頁面顯示如下
2.開啟redis客戶端,在命令列輸入publish TOPIC “this is test message”
瀏覽器頁面顯示如下:
說明剛剛釋出的訊息已經主動推送到瀏覽器顯示了。
完整程式碼見: https://gitee.com/freide/springboot
【轉載】自 https://my.oschina.net/freide/blog/2991435