spring websocket 5.05 基於stomp協議
WebSocket協議定義了兩種型別的訊息,文字和二進位制,但其內容未定義。 為客戶端和伺服器定義了一種協商子協議的機制 - 即更高級別的訊息傳遞協議,用於在WebSocket之上定義每種訊息可以傳送哪種訊息,每種訊息的格式和內容是什麼等等 上。 子協議的使用是可選的,但是客戶端和伺服器需要就定義訊息內容的一些協議達成一致。
STOMP是一種簡單的面向文字的訊息傳遞協議,最初是為指令碼語言(如Ruby,Python和Perl)建立的,用於連線企業訊息代理。 它旨在解決常用訊息傳遞模式的最小子集。 STOMP可用於任何可靠的雙向流媒體網路協議,如TCP和WebSocket。 儘管STOMP是一種面向文字的協議,但訊息負載可以是文字或二進位制。
STOMP是一個基於幀的協議,其幀在HTTP上建模。 STOMP框架的結構:
客戶可以使用SEND或SUBSCRIBE命令傳送或訂閱訊息以及描述訊息的內容和由誰來接收訊息的“目標”頭。這使得一個簡單的釋出 - 訂閱機制可以用來通過代理髮送訊息到其他連線的客戶端,或者傳送訊息到伺服器來請求執行一些工作。
在使用Spring的STOMP支援時,Spring WebSocket應用程式充當客戶端的STOMP代理。訊息被路由到@Controller訊息處理方法或一個簡單的記憶體代理,用於跟蹤訂閱並向訂閱使用者廣播訊息。您還可以將Spring配置為與專用的STOMP代理(例如RabbitMQ,ActiveMQ等)一起使用,以用於訊息的實際廣播。在這種情況下,Spring維護與代理的TCP連線,將訊息轉發給它,並將訊息從它傳遞到連線的WebSocket客戶端。因此,Spring Web應用程式可以依靠統一的基於HTTP的安全性,通用驗證以及熟悉的程式設計模型訊息處理工作。
這裡是客戶訂閱接收股票報價的例子,伺服器可以定期傳送例如通過計劃任務通過SimpMessagingTemplate將訊息傳送給代理:
以下是客戶端傳送交易請求的示例,伺服器可以通過@MessageMapping方法處理,稍後在執行後向客戶端廣播交易確認訊息和詳細資訊:
目的地的含義在STOMP規範中有意不透明。 它可以是任何字串,完全取決於STOMP伺服器來定義它們支援的目的地的語義和語法。 然而,對於目的地是類似路徑的字串,其中“/ topic / ..”意味著釋出 - 訂閱(一對多)和“/佇列/”意味著點對點(一對一 )訊息交換。
STOMP伺服器可以使用MESSAGE命令向所有使用者廣播訊息。 以下是向訂閱客戶端傳送股票報價的伺服器示例:
知道伺服器不能傳送未經請求的訊息很重要。 所有來自伺服器的訊息都必須響應特定的客戶端訂閱,並且伺服器訊息的“subscription-id”頭必須與客戶端訂閱的“id”頭相匹配。
以上概述旨在提供對STOMP協議的最基本的瞭解。 建議全面檢視協議規範。
使用STOMP作為子協議使Spring Framework和Spring Security能夠提供更豐富的程式設計模型,而不是使用原始WebSockets。 關於HTTP與原始TCP的關係以及Spring MVC和其他Web框架如何提供豐富的功能都可以做到這一點。 以下是一些好處:
不需要發明自定義訊息協議和訊息格式。
STOMP客戶端可用,包括Spring框架中的Java客戶端。
訊息代理(如RabbitMQ,ActiveMQ等)可以用於(可選)管理訂閱和廣播訊息。
應用程式邏輯可以組織在任何數量的@ Controller中,並且根據STOMP目標報頭路由到他們的訊息,以及用給定連線的單個WebSocketHandler處理原始WebSocket訊息。
使用Spring Security來保護基於STOMP目標和訊息型別的訊息。
spring-messaging和spring-websocket模組提供了對WebSocket支援的STOMP。 一旦你有這些依賴關係,你可以通過WebSocket和SockJS Fallback公開一個STOMP端點,如下所示:
WebSocketConfig配置:
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
import com.dougong.socket.handler.MyWebSocketHandlerDecoratorFactory;
import com.dougong.socket.interceptor.HandshakeInterceptor;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private MyWebSocketHandlerDecoratorFactory decoratorFactory;
/**
* applicationDestinationPrefixes應用字首,所有請求的訊息將會路由到@MessageMapping的controller上,
* enableStompBrokerRelay是代理字首,而返回的訊息將會路由到代理上,所有訂閱該代理的將收到響應的訊息。
*
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 應用程式以/app為字首,代理目的地以/topic、/user為字首
config.enableSimpleBroker("/topic", "/user");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user/");
// config.setApplicationDestinationPrefixes("/app");
// config.setUserDestinationPrefix("/user");
// config.enableSimpleBroker("/topic", "/queue");
// registry.enableStompBrokerRelay("/topic", "/queue")
// 下面這配置為預設配置,如有變動修改配置啟用就可以了
// .setRelayHost("127.0.0.1") //activeMq伺服器地址
// .setRelayPort(61613)//activemq 伺服器服務埠
// .setClientLogin("guest") //登陸賬戶
// .setClientPasscode("guest") // ;
}
/**
* 將"/hello"路徑註冊為STOMP端點,這個路徑與傳送和接收訊息的目的路徑有所不同,這是一個端點,客戶端在訂閱或釋出訊息到目的地址前,要連線該端點,
* 即使用者傳送請求url="/applicationName/hello"與STOMP server進行連線。之後再轉發到訂閱url;
* PS:端點的作用——客戶端在訂閱或釋出訊息到目的地址前,要連線該端點。
*
* @param stompEndpointRegistry
*
*
* 連線的端點,客戶端建立連線時需要連線這裡配置的端點
*
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 為java stomp client提供連結
// 在網頁上可以通過"/applicationName/hello"來和伺服器的WebSocket連線
registry.addEndpoint("/gs-guide-websocket").setAllowedOrigins("*")
/*.setHandshakeHandler(new MyHandshakeHandler())*/
.addInterceptors(new HandshakeInterceptor())
.withSockJS();
// 為js客戶端提供連結
//registry.addEndpoint("/hello").setAllowedOrigins("*")/*.setHandshakeHandler(new MyHandshakeHandler())
//.addInterceptors(new MyHandshakeInterceptor())*/.withSockJS();
// 在網頁上可以通過"/applicationName/hello"來和伺服器的WebSocket連線
// registry.addEndpoint("/gs-guide-websocket")
// .addInterceptors(new
// SpringWebSocketHandlerInterceptor()).setAllowedOrigins("*").withSockJS();
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.addDecoratorFactory(this.decoratorFactory);
registration.setMessageSizeLimit(75536).setSendBufferSizeLimit(75536).setSendTimeLimit(75536);
//super.configureWebSocketTransport(registration);
}
/**
* 輸入通道引數設定
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
//// super.configureClientInboundChannel(registration);
//registration.setInterceptors(new MySubscriptionInterceptor ());
//// 執行緒資訊
//registration.taskExecutor().corePoolSize(4).maxPoolSize(8).keepAliveSeconds(60);
//super.configureClientInboundChannel(registration);
}
/**
* 輸出通道引數配置
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
// super.configureClientOutboundChannel(registration);
// 執行緒資訊
//registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
//super.configureClientOutboundChannel(registration);
}
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
//messageConverters.add(new ByteArrayMessageConverter());
return true;
}
}
這裡需要注意configureWebSocketTransport方法中的registration.addDecoratorFactory(this.decoratorFactory);
decoratorFactory是自定義的一個類實現啦WebSocketHandlerDecoratorFactory介面,主要作用是用來處理自定義的WebSocketHandler,自定義的WebSocketHandler可以監控使用者下線或上線等。
其中還有registerStompEndpoints方法中添加了一個stomp握手攔截器
下面是decoratorFactory、自定義的WebSocketHandler和stomp握手攔截器的類:
MyWebSocketHandlerDecoratorFactory
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory;
@Component
public class MyWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory{
private MyWebSocketHandler myWebSocketHandler;
public MyWebSocketHandlerDecoratorFactory(){
}
@Override
public WebSocketHandler decorate(WebSocketHandler handler) {
if(this.myWebSocketHandler == null)
this.myWebSocketHandler = new MyWebSocketHandler(handler);
return this.myWebSocketHandler;
}
public MyWebSocketHandler getMyWebSocketHandler() {
return myWebSocketHandler;
}
}
MyWebSocketHandler
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
public class MyWebSocketHandler extends WebSocketHandlerDecorator{
public MyWebSocketHandler(WebSocketHandler delegate) {
super(delegate);
}
private static Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class);
private static final Map<String , WebSocketSession> users;//這個會出現效能問題,最好用Map來儲存,key用userid
static {
users = new HashMap<String , WebSocketSession>();
}
/**
* 連線成功時候,會觸發頁面上onopen方法
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// TODO Auto-generated method stub
Map<String, Object> map = session.getAttributes();
if(map != null && map.size() > 0){
String userid = (String) map.get("userid");
users.put(userid,session);
}
System.out.println("connect to the websocket success......當前數量:"+users.size());
//這塊會實現自己業務,比如,當用戶登入後,會把離線訊息推送給使用者
//TextMessage returnMessage = new TextMessage("你將收到的離線");
// session.sendMessage(returnMessage);
super.afterConnectionEstablished(session);//必須呼叫父類不然會報錯
}
/**
* 關閉連線時觸發
* 使用者退出後的處理,不如退出之後,要將使用者資訊從websocket的session中remove掉,這樣使用者就處於離線狀態了,也不會佔用系統資源
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
logger.debug("websocket connection closed......");
String username= (String) session.getAttributes().get("WEBSOCKET_USERNAME");
String userid= (String) session.getAttributes().get("userid");
System.out.println("使用者"+username+"已退出!");
users.remove(userid);
System.out.println("剩餘線上使用者"+users.size());
super.afterConnectionClosed(session, closeStatus);
}
/**
* js呼叫websocket.send時候,會呼叫該方法
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
//將訊息進行轉化,因為是訊息是json資料,可能裡面包含了傳送給某個人的資訊,所以需要用json相關的工具類處理之後再封裝成TextMessage,
//我這兒並沒有做處理,訊息的封裝格式一般有{from:xxxx,to:xxxxx,msg:xxxxx},來自哪裡,傳送給誰,什麼訊息等等
//TextMessage msg = (TextMessage)message.getPayload();
//給所有使用者群發訊息
//this.sendMessageToUsers(returnMessage,session);
//給指定使用者群發訊息
//sendMessageToUser(userId,msg);
//session.sendMessage(message);
super.handleMessage(session, message);
}
//後臺錯誤資訊處理方法
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if(session.isOpen()){
String userid= (String) session.getAttributes().get("userid");
users.remove(userid);
session.close();
}
logger.debug("websocket connection closed......");
super.handleTransportError(session, exception);
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 給某個使用者傳送訊息
*
* @param userName
* @param message
*/
public void sendMessageToUser(String userName, TextMessage message) {
for (Map.Entry<String , WebSocketSession> entry : users.entrySet()) {
if (entry.getValue().getAttributes().get("WEBSOCKET_USERNAME").equals(userName)) {
try {
if (entry.getValue().isOpen()) {
entry.getValue().sendMessage(message);
}
} catch (Exception e) {
e.printStackTrace();
}
break;
}
}
}
/**
* 給所有線上使用者傳送訊息
* @param message
*/
public void sendMessageToUsers(TextMessage message) {
for (Map.Entry<String , WebSocketSession> entry : users.entrySet()) {
try {
if (entry.getValue().isOpen()) {
entry.getValue().sendMessage(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 給所有線上使用者傳送訊息
*
* @param message
* @throws IOException
*/
public void broadcast(final TextMessage message) throws IOException {
Iterator<Entry<String, WebSocketSession>> it = users.entrySet().iterator();
// 多執行緒群發
while (it.hasNext()) {
final Entry<String, WebSocketSession> entry = it.next();
if (entry.getValue().isOpen()) {
// entry.getValue().sendMessage(message);
new Thread(new Runnable() {
public void run() {
try {
if (entry.getValue().isOpen()) {
entry.getValue().sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
}
HandshakeInterceptor
/*** stomp握手攔截器
* @author tomZ
* @date 2016年11月4日
* @desc TODO
*/
public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {
private static final Logger logger = LoggerFactory.getLogger( HandshakeInterceptor.class);
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
logger.info("===============before handshake=============");
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
String name = httpServletRequest.getParameter("name");
//HttpSession session = servletRequest.getServletRequest().getSession(false);
HttpSession session = httpServletRequest.getSession(false);
if (session != null) {
//使用userName區分WebSocketHandler,以便定向傳送訊息
String userid = (String) session.getAttribute("userid");//使用者資訊
if (userid == null) {
userid = "default-system"+session.getId();
}
attributes.put("userid",userid);//使用者userid
}
}
return super.beforeHandshake(request, response, wsHandler, attributes);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
System.out.println(wsHandler.getClass());
logger.info("===============after handshake=============");
super.afterHandshake(request, response, wsHandler, ex);
}
}
啟動後訪問:http://localhost:8888/index/socket
前端用的是:LayIM+stomp.min.js
原始碼地址:
https://gitee.com/zengwc/spring-boot-2.0.1
https://github.com/zengwenchong/spring-websocket-5.05-stomp-/tree/master
第一次寫部落格,寫得不好的地方望指出,一起交流