1. 程式人生 > >springboot情操陶冶-web配置(四)

springboot情操陶冶-web配置(四)

承接前文springboot情操陶冶-web配置(三),本文將在DispatcherServlet應用的基礎上談下websocket的使用

websocket

websocket的簡單瞭解可見維基百科WebSocket,在筆者看來其大多數應用在web瀏覽器上用於與服務端的持續性通訊,並大多用於接收伺服器的推送內容

簡單例子

spring很友好的向我們展示瞭如何在springboot上整合websocket,並給出了一個hello例子。讀者可參照官方例子走一遍便可對websocket有一定的瞭解。附上官方部分原始碼

Controller響應層

package hello;

import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
import org.springframework.web.util.HtmlUtils;

@Controller
public class GreetingController {


    @MessageMapping("/hello")
    @SendTo("/topic/greetings")
    public Greeting greeting(HelloMessage message) throws Exception {
        Thread.sleep(1000); // simulated delay
        return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!");
    }

}

客戶端HTML介面

<!DOCTYPE html>
<html>
<head>
    <title>Hello WebSocket</title>
    <link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
    <link href="/main.css" rel="stylesheet">
    <script src="/webjars/jquery/jquery.min.js"></script>
    <script src="/webjars/sockjs-client/sockjs.min.js"></script>
    <script src="/webjars/stomp-websocket/stomp.min.js"></script>
    <script src="/app.js"></script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being
    enabled. Please enable
    Javascript and reload this page!</h2></noscript>
<div id="main-content" class="container">
    <div class="row">
        <div class="col-md-6">
            <form class="form-inline">
                <div class="form-group">
                    <label for="connect">WebSocket connection:</label>
                    <button id="connect" class="btn btn-default" type="submit">Connect</button>
                    <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect
                    </button>
                </div>
            </form>
        </div>
        <div class="col-md-6">
            <form class="form-inline">
                <div class="form-group">
                    <label for="name">What is your name?</label>
                    <input type="text" id="name" class="form-control" placeholder="Your name here...">
                </div>
                <button id="send" class="btn btn-default" type="submit">Send</button>
            </form>
        </div>
    </div>
    <div class="row">
        <div class="col-md-12">
            <table id="conversation" class="table table-striped">
                <thead>
                <tr>
                    <th>Greetings</th>
                </tr>
                </thead>
                <tbody id="greetings">
                </tbody>
            </table>
        </div>
    </div>
</div>
</body>
</html>

在閱讀完官方的demo例子之後,讀者務必再去閱覽下WebSocket在springboot的基本概念>>>Web on Servlet Stack。雖然文件很長,但還是需要理解下其的工作原理,大致上和rabbitmq類似,採取的是訂閱推送的模式。 websocket_page

原始碼層分析

筆者優先關注下@EnableWebSocketMessageBroker註解,其用於開啟websocket服務

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketMessageBrokerConfiguration.class)
public @interface EnableWebSocketMessageBroker {

}

由上可知,引入的DelegatingWebSocketMessageBrokerConfiguration類用於載入websocket的相關配置。 本文不進行詳細的原始碼分析,筆者則會根據上圖的原理圖尋找在springboot中的配置,這樣應該會起到事半功倍的效果。

RequestChannel和ResponseChannel

請求與響應處理通道

    @Bean
    public AbstractSubscribableChannel clientInboundChannel() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
        // 攔截器,使用者也可通過WebSocketMessageBrokerConfigurer介面增加攔截器
        ChannelRegistration reg = getClientInboundChannelRegistration();
        if (reg.hasInterceptors()) {
            channel.setInterceptors(reg.getInterceptors());
        }
        return channel;
    }
    
    protected final ChannelRegistration getClientInboundChannelRegistration() {
        if (this.clientInboundChannelRegistration == null) {
            ChannelRegistration registration = new ChannelRegistration();
            // 載入請求通道,也可新增攔截器
            configureClientInboundChannel(registration);
            registration.interceptors(new ImmutableMessageChannelInterceptor());
            this.clientInboundChannelRegistration = registration;
        }
        return this.clientInboundChannelRegistration;
    }
    
    @Bean
    public AbstractSubscribableChannel clientOutboundChannel() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
        // 攔截器,使用者也可通過WebSocketMessageBrokerConfigurer介面增加攔截器
        ChannelRegistration reg = getClientOutboundChannelRegistration();
        if (reg.hasInterceptors()) {
            channel.setInterceptors(reg.getInterceptors());
        }
        return channel;
    }

不管是請求通道還是響應通道程式碼一模一樣,使用的均是ExecutorSubscribableChannel,其內部整合了攔截器和執行緒池。此類基本是所有channel的公用類,筆者稍微看下里面有什麼小花頭

No.1 訊息處理ExecutorSubscribableChannel

    @Override
    public boolean sendInternal(Message<?> message, long timeout) {
        // 訊息處理者遍歷
        for (MessageHandler handler : getSubscribers()) {
            // 統一由SendTask類處理訊息
            SendTask sendTask = new SendTask(message, handler);
            if (this.executor == null) {
                sendTask.run();
            }
            else {
                this.executor.execute(sendTask);
            }
        }
        return true;
    }

這裡的訊息處理者有直接處理註解的,也有直接返回給BorkerRelay的,讀者可自行去查閱

No.2 訊息任務SendTask

        @Override
        public void run() {
            Message<?> message = this.inputMessage;
            try {
                // 應用攔截器
                message = applyBeforeHandle(message);
                if (message == null) {
                    return;
                }
                // 通過messageHandler來處理訊息
                this.messageHandler.handleMessage(message);
                triggerAfterMessageHandled(message, null);
            }
            catch (Exception ex) {
                triggerAfterMessageHandled(message, ex);
                if (ex instanceof MessagingException) {
                    throw (MessagingException) ex;
                }
                String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
                throw new MessageDeliveryException(message, description, ex);
            }
            catch (Throwable err) {
                String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
                MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
                triggerAfterMessageHandled(message, ex2);
                throw ex2;
            }
        }

由此可得出通用的SendTask只是個訊息中轉器,最終的訊息處理均是由MessageHandler來解決的。看來處理訊息的路由核心得繼續往下文分析了

註解方式訊息處理器MessageHandler

即解析@MessageMapping/@SendTo等websocket註解的方法,其類似於MVC的@RequestMapping等註解。可見SimpAnnotationMethodMessageHandler

    @Override
    protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {
        // 請求和響應通道、broker訊息模板
        return new WebSocketAnnotationMethodMessageHandler(
                clientInboundChannel(), clientOutboundChannel(), brokerMessagingTemplate());
    }

上述程式碼中的broker訊息模板主要通過brokerChannel通道將註解方法返回的值經過訂閱關係處理後再傳入響應通道。筆者此處對WebSocketAnnotationMethodMessageHandler作下分步驟的解析

No.1 入參解析器


    protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
        ApplicationContext context = getApplicationContext();
        ConfigurableBeanFactory beanFactory = (context instanceof ConfigurableApplicationContext ?
                ((ConfigurableApplicationContext) context).getBeanFactory() : null);

        List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>();

        // @Header和@Headers引數註解解析
        resolvers.add(new HeaderMethodArgumentResolver(this.conversionService, beanFactory));
        resolvers.add(new HeadersMethodArgumentResolver());
        // @DestinationVariable註解引數解析
        resolvers.add(new DestinationVariableMethodArgumentResolver(this.conversionService));

        // 讀取Principal型別的引數,讀取的為頭部的simpUser屬性
        resolvers.add(new PrincipalMethodArgumentResolver());
        // 讀取Message型別的引數
        resolvers.add(new MessageMethodArgumentResolver(this.messageConverter));

        resolvers.addAll(getCustomArgumentResolvers());//使用者自定義,可擴充套件
        // @Payload註解的引數解析
        resolvers.add(new PayloadArgumentResolver(this.messageConverter, this.validator));

        return resolvers;
    }

No.2 反參解析器

    @Override
    protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
        List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();

        // Single-purpose return value types
        handlers.add(new ListenableFutureReturnValueHandler());
        handlers.add(new CompletableFutureReturnValueHandler());

        // @SendTo和@SendToUser註解解析器
        SendToMethodReturnValueHandler sendToHandler =
                new SendToMethodReturnValueHandler(this.brokerTemplate, true);
        if (this.headerInitializer != null) {
            sendToHandler.setHeaderInitializer(this.headerInitializer);
        }
        handlers.add(sendToHandler);

        // @SubscribeMapping註解解析器
        SubscriptionMethodReturnValueHandler subscriptionHandler =
                new SubscriptionMethodReturnValueHandler(this.clientMessagingTemplate);
        subscriptionHandler.setHeaderInitializer(this.headerInitializer);
        handlers.add(subscriptionHandler);

        // custom return value types
        handlers.addAll(getCustomReturnValueHandlers());

        // 預設處理
        sendToHandler = new SendToMethodReturnValueHandler(this.brokerTemplate, false);
        sendToHandler.setHeaderInitializer(this.headerInitializer);
        handlers.add(sendToHandler);

        return handlers;
    }

No.3 HandlerMethod物件建立

    @Override
    public void afterPropertiesSet() {
        // 入參和反參解析器配置
        if (this.argumentResolvers.getResolvers().isEmpty()) {
            this.argumentResolvers.addResolvers(initArgumentResolvers());
        }

        if (this.returnValueHandlers.getReturnValueHandlers().isEmpty()) {
            this.returnValueHandlers.addHandlers(initReturnValueHandlers());
        }

        ApplicationContext context = getApplicationContext();
        if (context == null) {
            return;
        }
        for (String beanName : context.getBeanNamesForType(Object.class)) {
            if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
                Class<?> beanType = null;
                try {
                    beanType = context.getType(beanName);
                }
                catch (Throwable ex) {
                    // An unresolvable bean type, probably from a lazy bean - let's ignore it.
                    if (logger.isDebugEnabled()) {
                        logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
                    }
                }
                // 查詢被@Controller註解下修飾的帶有@MessageMapping和@SubscribeMapping註解的方法集合並存放至handlerMethods對映集合中
                if (beanType != null && isHandler(beanType)) {
                    detectHandlerMethods(beanName);
                }
            }
        }
    }

主要是搜尋帶有@MessageMapping@SubscribeMapping註解的方法註冊至MessageHandler物件中的handlerMethods屬性,方便後續對請求的路由

No.4 請求通道註冊SimpAnnotationMethodMessageHandler處理類

    @Override
    public final void start() {
        synchronized (this.lifecycleMonitor) {
            // 請求通道注入此處理器
            this.clientInboundChannel.subscribe(this);
            this.running = true;
        }
    }

針對註解方式的訊息路由處理我們基本知道了,那這個針對websocket的發過來的請求如何被路由至相應的HandlerMethod中呢?

HandlerMapping

筆者此處找尋下針對websocket的請求的路由

    @Bean
    public HandlerMapping stompWebSocketHandlerMapping() {
        WebSocketHandler handler = decorateWebSocketHandler(subProtocolWebSocketHandler());
        WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
                handler, getTransportRegistration(), messageBrokerTaskScheduler());
        ApplicationContext applicationContext = getApplicationContext();
        if (applicationContext != null) {
            registry.setApplicationContext(applicationContext);
        }
        registerStompEndpoints(registry);
        // 返回的是AbstractUrlHandlerMapping的繼承類
        return registry.getHandlerMapping();
    }

具體的讀者可查詢原始碼,內容還是很多的,筆者只知道上述程式碼返回的是AbstractUrlHandlerMapping的繼承類,其儲存的urlMap中的key值為websocket的端點路徑,比如/ws-demo/**,而value值則是HttpRequestHandler介面的實現類,其主要處理基於HTTP的websocket請求。

@FunctionalInterface
public interface HttpRequestHandler {

    /**
     * Process the given request, generating a response.
     * @param request current HTTP request
     * @param response current HTTP response
     * @throws ServletException in case of general errors
     * @throws IOException in case of I/O errors
     */
    void handleRequest(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException;

}

感興趣的讀者可進行深入的研究,其中有包括對ajax/sockJs/handshake等的支援。

訊息流

主要是由AbstractHttpReceivingTransportHandler接收客戶端的請求,然後通過StompSubProtocolHandler類將訊息傳送至ExecutorSubscribableChannel,由其呼叫sendInternal()方法遍歷註冊的MessageHandlers,由後者去真正的處理訊息並回包。具體的程式碼邏輯還是很複雜的,筆者此處羅列下註解方式的處理以及響應給客戶端的訊息處理

No.1 註解訊息處理AbstractMethodMessageHandler

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        // 獲取目的地址
        String destination = getDestination(message);
        if (destination == null) {
            return;
        }
        // 確保請求的發過來的地址是指定的字首,否則訊息就會被直接丟棄
        String lookupDestination = getLookupDestination(destination);
        if (lookupDestination == null) {
            return;
        }

        MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message);
        headerAccessor.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, lookupDestination);
        headerAccessor.setLeaveMutable(true);
        message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());

        if (logger.isDebugEnabled()) {
            logger.debug("Searching methods to handle " +
                    headerAccessor.getShortLogMessage(message.getPayload()) +
                    ", lookupDestination='" + lookupDestination + "'");
        }
        // 找尋註解進行相應的方法響應
        handleMessageInternal(message, lookupDestination);
        headerAccessor.setImmutable();
    }

此處需要注意的是請求的路徑字首必須是指定的字首,此字首可通過WebSocketMessageBrokerConfigurer#configureMessageBroker()方法來設定,如下

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // response destination prefix
        registry.enableSimpleBroker("/topic");
        // request destination prefix
        registry.setApplicationDestinationPrefixes("/app");
    }

No.2 註解訊息響應處理SimpleBrokerMessageHandler

@Override
    protected void handleMessageInternal(Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
        String destination = SimpMessageHeaderAccessor.getDestination(headers);
        String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);

        updateSessionReadTime(sessionId);
        
        // 此處確保回包的路徑是以指定的BrokerPath作為字首,否則則會被丟棄,配置同上
        if (!checkDestinationPrefix(destination)) {
            return;
        }
        
        // 針對訊息的傳送,會根據多個訂閱者進行廣播發送
        if (SimpMessageType.MESSAGE.equals(messageType)) {
            logMessage(message);
            sendMessageToSubscribers(destination, message);
        }
        // 連線請求響應
        else if (SimpMessageType.CONNECT.equals(messageType)) {
                ....
            }
        }
        // 關閉請求響應
        else if (SimpMessageType.DISCONNECT.equals(messageType)) {
            ....
        }
        // 訂閱請求響應
        else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
            ...
        }
        // 取消訂閱請求響應
        else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
            ....
        }
    }

No.2 訊息響應處理StompBrokerRelayMessageHandler,其作為真實的處理響應的出處

@Override
    protected void handleMessageInternal(Message<?> message) {
        String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());

        .....
        .....

        // 回包路徑,預設以使用者設定的BrokerPath為字首;不滿足就將包丟棄
        String destination = stompAccessor.getDestination();
        if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
            return;
        }
        
        // 連線請求
        if (StompCommand.CONNECT.equals(command)) {
            if (logger.isDebugEnabled()) {
                logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
            }
            stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
            stompAccessor.setLogin(this.clientLogin);
            stompAccessor.setPasscode(this.clientPasscode);
            if (getVirtualHost() != null) {
                stompAccessor.setHost(getVirtualHost());
            }
            StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
            this.connectionHandlers.put(sessionId, handler);
            this.stats.incrementConnectCount();
            Assert.state(this.tcpClient != null, "No TCP client available");
            this.tcpClient.connect(handler);
        }
        // 關閉請求
        else if (StompCommand.DISCONNECT.equals(command)) {
            StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
            if (handler == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
                }
                return;
            }
            stats.incrementDisconnectCount();
            handler.forward(message, stompAccessor);
        }
        else {
            StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
            if (handler == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("No TCP connection for session " + sessionId + " in " + message);
                }
                return;
            }
            // 直接呼叫連線返回,內含sessionId以及訂閱者id等等
            handler.forward(message, stompAccessor);
        }
    }

小結

先了解websocket的原理,然後再結合原始碼加深對原理的理解,這便是瞭解一個新技術的必要步驟。筆者此處針對官方的例子作以下小貼士 1.配置websocket的請求響應字首以及端點配置,務必實現WebSocketMessageBrokerConfigurer介面

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // register web socket contextPath and allow any origin
        registry.addEndpoint("/ws-demo").setAllowedOrigins("*").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // response destination prefix
        registry.enableSimpleBroker("/topic");
        // request destination prefix
        registry.setApplicationDestinationPrefixes("/app");
    }
}

2.針對回包處理時,一般我們需要指定路徑,如果採用註解方式,預設情況下@SendTo不指定的時候,會採用使用者設定的回包路徑字首,比如@MessageMapping("/app/hello")-->/topic/hello。 當然使用者也可以採用SimpMessageTemplate#convertAndSend()方法直接傳送至指定的回包路徑

3.客戶端採用sockJs相關API時,其支援通過HTTP/HTTPS協議連線指定的websocket端點,但是務必在訂閱或者傳送訊息的時候,指定的目的地址必須以/為開頭,否則傳送不成功