1. 程式人生 > >spring websocket 5.05 基於stomp協議

spring websocket 5.05 基於stomp協議

WebSocket協議定義了兩種型別的訊息,文字和二進位制,但其內容未定義。 為客戶端和伺服器定義了一種協商子協議的機制 - 即更高級別的訊息傳遞協議,用於在WebSocket之上定義每種訊息可以傳送哪種訊息,每種訊息的格式和內容是什麼等等 上。 子協議的使用是可選的,但是客戶端和伺服器需要就定義訊息內容的一些協議達成一致。

STOMP是一種簡單的面向文字的訊息傳遞協議,最初是為指令碼語言(如Ruby,Python和Perl)建立的,用於連線企業訊息代理。 它旨在解決常用訊息傳遞模式的最小子集。 STOMP可用於任何可靠的雙向流媒體網路協議,如TCP和WebSocket。 儘管STOMP是一種面向文字的協議,但訊息負載可以是文字或二進位制。


STOMP是一個基於幀的協議,其幀在HTTP上建模。 STOMP框架的結構:


客戶可以使用SEND或SUBSCRIBE命令傳送或訂閱訊息以及描述訊息的內容和由誰來接收訊息的“目標”頭。這使得一個簡單的釋出 - 訂閱機制可以用來通過代理髮送訊息到其他連線的客戶端,或者傳送訊息到伺服器來請求執行一些工作。


在使用Spring的STOMP支援時,Spring WebSocket應用程式充當客戶端的STOMP代理。訊息被路由到@Controller訊息處理方法或一個簡單的記憶體代理,用於跟蹤訂閱並向訂閱使用者廣播訊息。您還可以將Spring配置為與專用的STOMP代理(例如RabbitMQ,ActiveMQ等)一起使用,以用於訊息的實際廣播。在這種情況下,Spring維護與代理的TCP連線,將訊息轉發給它,並將訊息從它傳遞到連線的WebSocket客戶端。因此,Spring Web應用程式可以依靠統一的基於HTTP的安全性,通用驗證以及熟悉的程式設計模型訊息處理工作。


這裡是客戶訂閱接收股票報價的例子,伺服器可以定期傳送例如通過計劃任務通過SimpMessagingTemplate將訊息傳送給代理:


以下是客戶端傳送交易請求的示例,伺服器可以通過@MessageMapping方法處理,稍後在執行後向客戶端廣播交易確認訊息和詳細資訊:


目的地的含義在STOMP規範中有意不透明。 它可以是任何字串,完全取決於STOMP伺服器來定義它們支援的目的地的語義和語法。 然而,對於目的地是類似路徑的字串,其中“/ topic / ..”意味著釋出 - 訂閱(一對多)和“/佇列/”意味著點對點(一對一 )訊息交換。


STOMP伺服器可以使用MESSAGE命令向所有使用者廣播訊息。 以下是向訂閱客戶端傳送股票報價的伺服器示例:


知道伺服器不能傳送未經請求的訊息很重要。 所有來自伺服器的訊息都必須響應特定的客戶端訂閱,並且伺服器訊息的“subscription-id”頭必須與客戶端訂閱的“id”頭相匹配。


以上概述旨在提供對STOMP協議的最基本的瞭解。 建議全面檢視協議規範。

使用STOMP作為子協議使Spring Framework和Spring Security能夠提供更豐富的程式設計模型,而不是使用原始WebSockets。 關於HTTP與原始TCP的關係以及Spring MVC和其他Web框架如何提供豐富的功能都可以做到這一點。 以下是一些好處:
不需要發明自定義訊息協議和訊息格式。
STOMP客戶端可用,包括Spring框架中的Java客戶端。
訊息代理(如RabbitMQ,ActiveMQ等)可以用於(可選)管理訂閱和廣播訊息。
應用程式邏輯可以組織在任何數量的@ Controller中,並且根據STOMP目標報頭路由到他們的訊息,以及用給定連線的單個WebSocketHandler處理原始WebSocket訊息。

使用Spring Security來保護基於STOMP目標和訊息型別的訊息。

spring-messaging和spring-websocket模組提供了對WebSocket支援的STOMP。 一旦你有這些依賴關係,你可以通過WebSocket和SockJS Fallback公開一個STOMP端點,如下所示:

WebSocketConfig配置:

import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;


import com.dougong.socket.handler.MyWebSocketHandlerDecoratorFactory;
import com.dougong.socket.interceptor.HandshakeInterceptor;




@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{


@Autowired
private MyWebSocketHandlerDecoratorFactory decoratorFactory;

/**
* applicationDestinationPrefixes應用字首,所有請求的訊息將會路由到@MessageMapping的controller上,
* enableStompBrokerRelay是代理字首,而返回的訊息將會路由到代理上,所有訂閱該代理的將收到響應的訊息。

*/
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 應用程式以/app為字首,代理目的地以/topic、/user為字首
config.enableSimpleBroker("/topic", "/user");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user/"); 
// config.setApplicationDestinationPrefixes("/app");
// config.setUserDestinationPrefix("/user");
// config.enableSimpleBroker("/topic", "/queue");
// registry.enableStompBrokerRelay("/topic", "/queue")
// 下面這配置為預設配置,如有變動修改配置啟用就可以了
// .setRelayHost("127.0.0.1") //activeMq伺服器地址
// .setRelayPort(61613)//activemq 伺服器服務埠
// .setClientLogin("guest") //登陸賬戶
// .setClientPasscode("guest") // ;
}


/**
* 將"/hello"路徑註冊為STOMP端點,這個路徑與傳送和接收訊息的目的路徑有所不同,這是一個端點,客戶端在訂閱或釋出訊息到目的地址前,要連線該端點,
* 即使用者傳送請求url="/applicationName/hello"與STOMP server進行連線。之後再轉發到訂閱url;
* PS:端點的作用——客戶端在訂閱或釋出訊息到目的地址前,要連線該端點。

* @param stompEndpointRegistry


*            連線的端點,客戶端建立連線時需要連線這裡配置的端點

*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 為java stomp client提供連結
// 在網頁上可以通過"/applicationName/hello"來和伺服器的WebSocket連線
registry.addEndpoint("/gs-guide-websocket").setAllowedOrigins("*")
/*.setHandshakeHandler(new MyHandshakeHandler())*/
.addInterceptors(new HandshakeInterceptor())
.withSockJS();


// 為js客戶端提供連結
//registry.addEndpoint("/hello").setAllowedOrigins("*")/*.setHandshakeHandler(new MyHandshakeHandler())
//.addInterceptors(new MyHandshakeInterceptor())*/.withSockJS();
// 在網頁上可以通過"/applicationName/hello"來和伺服器的WebSocket連線
// registry.addEndpoint("/gs-guide-websocket")
// .addInterceptors(new
// SpringWebSocketHandlerInterceptor()).setAllowedOrigins("*").withSockJS();
}


@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.addDecoratorFactory(this.decoratorFactory);
registration.setMessageSizeLimit(75536).setSendBufferSizeLimit(75536).setSendTimeLimit(75536);
//super.configureWebSocketTransport(registration);
}


/**
* 輸入通道引數設定
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
//// super.configureClientInboundChannel(registration);
//registration.setInterceptors(new MySubscriptionInterceptor ());
//// 執行緒資訊
//registration.taskExecutor().corePoolSize(4).maxPoolSize(8).keepAliveSeconds(60);
//super.configureClientInboundChannel(registration);
}


/**
* 輸出通道引數配置
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
// super.configureClientOutboundChannel(registration);
// 執行緒資訊
//registration.taskExecutor().corePoolSize(4).maxPoolSize(8);
//super.configureClientOutboundChannel(registration);
}


@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
//messageConverters.add(new ByteArrayMessageConverter());
return true;
}

}

這裡需要注意configureWebSocketTransport方法中的registration.addDecoratorFactory(this.decoratorFactory);


decoratorFactory是自定義的一個類實現啦WebSocketHandlerDecoratorFactory介面,主要作用是用來處理自定義的WebSocketHandler,自定義的WebSocketHandler可以監控使用者下線或上線等。

其中還有registerStompEndpoints方法中添加了一個stomp握手攔截器 


下面是decoratorFactory、自定義的WebSocketHandler和stomp握手攔截器的類:

MyWebSocketHandlerDecoratorFactory

import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.handler.WebSocketHandlerDecoratorFactory;
@Component
public class MyWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory{

private MyWebSocketHandler myWebSocketHandler;

public MyWebSocketHandlerDecoratorFactory(){

}

@Override
public WebSocketHandler decorate(WebSocketHandler handler) {
if(this.myWebSocketHandler == null)
this.myWebSocketHandler = new MyWebSocketHandler(handler);

return this.myWebSocketHandler;
}


public MyWebSocketHandler getMyWebSocketHandler() {
return myWebSocketHandler;
}

}

MyWebSocketHandler

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.*;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;


import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;


public class MyWebSocketHandler extends WebSocketHandlerDecorator{

public MyWebSocketHandler(WebSocketHandler delegate) {
super(delegate);
}


private static Logger logger = LoggerFactory.getLogger(MyWebSocketHandler.class);
    
    private static final Map<String , WebSocketSession> users;//這個會出現效能問題,最好用Map來儲存,key用userid
    
    static {
        users = new HashMap<String , WebSocketSession>();
    }
    
/**
     * 連線成功時候,會觸發頁面上onopen方法
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        // TODO Auto-generated method stub
        Map<String, Object> map = session.getAttributes();
        if(map != null && map.size() > 0){
        String userid = (String) map.get("userid");
        users.put(userid,session);
        }
        System.out.println("connect to the websocket success......當前數量:"+users.size());
        //這塊會實現自己業務,比如,當用戶登入後,會把離線訊息推送給使用者
        //TextMessage returnMessage = new TextMessage("你將收到的離線");
//        session.sendMessage(returnMessage);
        super.afterConnectionEstablished(session);//必須呼叫父類不然會報錯
    }


    /**
     * 關閉連線時觸發
     * 使用者退出後的處理,不如退出之後,要將使用者資訊從websocket的session中remove掉,這樣使用者就處於離線狀態了,也不會佔用系統資源
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        logger.debug("websocket connection closed......");
        String username= (String) session.getAttributes().get("WEBSOCKET_USERNAME");
        String userid= (String) session.getAttributes().get("userid");
        System.out.println("使用者"+username+"已退出!");
        users.remove(userid);
        System.out.println("剩餘線上使用者"+users.size());
        super.afterConnectionClosed(session, closeStatus);
    }


    /**
     * js呼叫websocket.send時候,會呼叫該方法
     */
    @Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
    //將訊息進行轉化,因為是訊息是json資料,可能裡面包含了傳送給某個人的資訊,所以需要用json相關的工具類處理之後再封裝成TextMessage,
   //我這兒並沒有做處理,訊息的封裝格式一般有{from:xxxx,to:xxxxx,msg:xxxxx},來自哪裡,傳送給誰,什麼訊息等等    
   //TextMessage msg = (TextMessage)message.getPayload();
   //給所有使用者群發訊息
    //this.sendMessageToUsers(returnMessage,session);
   //給指定使用者群發訊息
   //sendMessageToUser(userId,msg);
    //session.sendMessage(message);
        super.handleMessage(session, message);
    }
    


    //後臺錯誤資訊處理方法
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if(session.isOpen()){
        String userid= (String) session.getAttributes().get("userid");
        users.remove(userid);
        session.close();
        }
        logger.debug("websocket connection closed......");
        super.handleTransportError(session, exception);
    }


    @Override
    public boolean supportsPartialMessages() {
        return false;
    }




    /**
     * 給某個使用者傳送訊息
     *
     * @param userName
     * @param message
     */
    public void sendMessageToUser(String userName, TextMessage message) {
        for (Map.Entry<String , WebSocketSession> entry : users.entrySet()) {
            if (entry.getValue().getAttributes().get("WEBSOCKET_USERNAME").equals(userName)) {
                try {
                    if (entry.getValue().isOpen()) {
                    entry.getValue().sendMessage(message);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
            }
        }
    }


    /**
     * 給所有線上使用者傳送訊息
     * @param message
     */
    public void sendMessageToUsers(TextMessage message) {
        for (Map.Entry<String , WebSocketSession> entry : users.entrySet()) {
            try {
        if (entry.getValue().isOpen()) {
        entry.getValue().sendMessage(message);
                 }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 給所有線上使用者傳送訊息
     * 
     * @param message
     * @throws IOException
     */
    public void broadcast(final TextMessage message) throws IOException {
        Iterator<Entry<String, WebSocketSession>> it = users.entrySet().iterator();
 
        // 多執行緒群發
        while (it.hasNext()) {
            final Entry<String, WebSocketSession> entry = it.next();
            if (entry.getValue().isOpen()) {
                // entry.getValue().sendMessage(message);
                new Thread(new Runnable() {
                    public void run() {
                        try {
                            if (entry.getValue().isOpen()) {
                                entry.getValue().sendMessage(message);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
 
        }
    }
}

HandshakeInterceptor

/** 
 * stomp握手攔截器 
 * @author tomZ 
 * @date 2016年11月4日 
 * @desc TODO 
 */  
public class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {  
    private static final Logger logger = LoggerFactory.getLogger( HandshakeInterceptor.class);  
      
    @Override  
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,  
            Map<String, Object> attributes) throws Exception {  
    logger.info("===============before  handshake=============");  
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
            HttpServletRequest httpServletRequest = servletRequest.getServletRequest();
            String name = httpServletRequest.getParameter("name");
            //HttpSession session = servletRequest.getServletRequest().getSession(false);
            HttpSession session = httpServletRequest.getSession(false);
            if (session != null) {
                //使用userName區分WebSocketHandler,以便定向傳送訊息
                String userid = (String) session.getAttribute("userid");//使用者資訊
                if (userid == null) {
                userid = "default-system"+session.getId();
                }
                attributes.put("userid",userid);//使用者userid
            }
            
        }
        return super.beforeHandshake(request, response, wsHandler, attributes);  
    }  
      
    @Override  
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler                         wsHandler,   Exception ex) {
    System.out.println(wsHandler.getClass());
        logger.info("===============after handshake=============");  
        super.afterHandshake(request, response, wsHandler, ex);  
    }  
}  

啟動後訪問:http://localhost:8888/index/socket

前端用的是:LayIM+stomp.min.js

原始碼地址:

https://gitee.com/zengwc/spring-boot-2.0.1

https://github.com/zengwenchong/spring-websocket-5.05-stomp-/tree/master

第一次寫部落格,寫得不好的地方望指出,一起交流