springboot 整合websocket 以及解決tomcat叢集環境websocket共享問題
阿新 • • 發佈:2019-01-10
解決方案:使用redis訊息釋出訂閱解決多個tomcat應用伺服器下,連線不共享問題;具體如下
@Configuration
public class WebSocketConfig {
//TODEO如果用外接tomcat,要註釋掉以下程式碼,否則啟動專案會報錯,用springboot內建tomcat就得放開以下程式碼
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
以及實現 WebSocketServer
private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class); //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。 private static int onlineCount = 0; //concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料 private Session session; //接收userId private String userId = ""; /** * 連線建立成功呼叫的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //線上數加1 log.info("有新視窗開始監聽:" + userId + ",當前線上人數為" + getOnlineCount()); this.userId = userId; try { sendMessage("連線成功"); } catch (IOException e) { log.error("websocket IO異常"); } } /** * 連線關閉呼叫的方法 */ @OnClose public void onClose() { boolean flag = webSocketSet.remove(this); //從set中刪除 if (flag) { subOnlineCount(); //線上數減1 log.info("有一連線關閉!當前線上人數為" + getOnlineCount()); } } /** * 收到客戶端訊息後呼叫的方法 * * @param message 客戶端傳送過來的訊息 */ @OnMessage public void onMessage(String message, Session session) { log.info("收到來自視窗" + userId + "的資訊:" + message); //群發訊息 // for (WebSocketServer item : webSocketSet) { // try { // item.sendMessage(message); // } catch (IOException e) { // e.printStackTrace(); // } // } } /** * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("發生錯誤:" + error.getMessage()); // error.printStackTrace(); } /** * 實現伺服器主動推送 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群發自定義訊息 */ public static boolean sendInfo(String message, @PathParam("userId") String userId) { log.info("推送訊息到視窗" + userId + ",推送內容:" + message); for (WebSocketServer item : webSocketSet) { try { //這裡可以設定只推送給這個userId的,為null則全部推送 // if (userId == null) { // item.sendMessage(message); // } else if (item.userId.equals(userId)) { if (item.userId.equals(userId)) { item.sendMessage(message); return true; } } catch (IOException e) { continue; } } return false; } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
設定監聽類
@Configuration public class RedisSubListenerConfig { //初始化監聽器 @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic(RedisKeyConstants.TOPIC_CHANNEL_SENDWEBSOCKET)); return container; } //利用反射來建立監聽到訊息之後的執行方法 @Bean MessageListenerAdapter listenerAdapter(OutpatientRedisDaoImpl redisReceiver) { return new MessageListenerAdapter(redisReceiver, "sendMessageByOpen"); } }
//實現監聽後執行的方法
@Component("outpatientRedisDao")
public class OutpatientRedisDaoImpl {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void sendMessageByOpen(String message) {
logger.info("reids訂閱訊息收到的引數message:{}", message);
message = message.substring(1, message.length()-1);
String [] info = message.split(";");
JSONObject content = JSONObject.fromObject(info[1].replaceAll("\\\\\"", "\""));
WebSocketServer.sendInfo(content.toString(), info[0]);
}
}