springboot情操陶冶-web配置(四)
承接前文springboot情操陶冶-web配置(三),本文將在DispatcherServlet應用的基礎上談下websocket的使用
websocket
websocket的簡單瞭解可見維基百科WebSocket,在筆者看來其大多數應用在web瀏覽器上用於與服務端的持續性通訊,並大多用於接收伺服器的推送內容
簡單例子
spring很友好的向我們展示瞭如何在springboot上整合websocket,並給出了一個hello例子。讀者可參照官方例子走一遍便可對websocket有一定的瞭解。附上官方部分原始碼
Controller響應層
package hello; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Controller; import org.springframework.web.util.HtmlUtils; @Controller public class GreetingController { @MessageMapping("/hello") @SendTo("/topic/greetings") public Greeting greeting(HelloMessage message) throws Exception { Thread.sleep(1000); // simulated delay return new Greeting("Hello, " + HtmlUtils.htmlEscape(message.getName()) + "!"); } }
客戶端HTML介面
<!DOCTYPE html> <html> <head> <title>Hello WebSocket</title> <link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet"> <link href="/main.css" rel="stylesheet"> <script src="/webjars/jquery/jquery.min.js"></script> <script src="/webjars/sockjs-client/sockjs.min.js"></script> <script src="/webjars/stomp-websocket/stomp.min.js"></script> <script src="/app.js"></script> </head> <body> <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being enabled. Please enable Javascript and reload this page!</h2></noscript> <div id="main-content" class="container"> <div class="row"> <div class="col-md-6"> <form class="form-inline"> <div class="form-group"> <label for="connect">WebSocket connection:</label> <button id="connect" class="btn btn-default" type="submit">Connect</button> <button id="disconnect" class="btn btn-default" type="submit" disabled="disabled">Disconnect </button> </div> </form> </div> <div class="col-md-6"> <form class="form-inline"> <div class="form-group"> <label for="name">What is your name?</label> <input type="text" id="name" class="form-control" placeholder="Your name here..."> </div> <button id="send" class="btn btn-default" type="submit">Send</button> </form> </div> </div> <div class="row"> <div class="col-md-12"> <table id="conversation" class="table table-striped"> <thead> <tr> <th>Greetings</th> </tr> </thead> <tbody id="greetings"> </tbody> </table> </div> </div> </div> </body> </html>
在閱讀完官方的demo例子之後,讀者務必再去閱覽下WebSocket在springboot的基本概念>>>Web on Servlet Stack。雖然文件很長,但還是需要理解下其的工作原理,大致上和rabbitmq類似,採取的是訂閱推送的模式。
原始碼層分析
筆者優先關注下@EnableWebSocketMessageBroker註解,其用於開啟websocket服務
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(DelegatingWebSocketMessageBrokerConfiguration.class) public @interface EnableWebSocketMessageBroker { }
由上可知,引入的DelegatingWebSocketMessageBrokerConfiguration類用於載入websocket的相關配置。 本文不進行詳細的原始碼分析,筆者則會根據上圖的原理圖尋找在springboot中的配置,這樣應該會起到事半功倍的效果。
RequestChannel和ResponseChannel
請求與響應處理通道
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
// 攔截器,使用者也可通過WebSocketMessageBrokerConfigurer介面增加攔截器
ChannelRegistration reg = getClientInboundChannelRegistration();
if (reg.hasInterceptors()) {
channel.setInterceptors(reg.getInterceptors());
}
return channel;
}
protected final ChannelRegistration getClientInboundChannelRegistration() {
if (this.clientInboundChannelRegistration == null) {
ChannelRegistration registration = new ChannelRegistration();
// 載入請求通道,也可新增攔截器
configureClientInboundChannel(registration);
registration.interceptors(new ImmutableMessageChannelInterceptor());
this.clientInboundChannelRegistration = registration;
}
return this.clientInboundChannelRegistration;
}
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
// 攔截器,使用者也可通過WebSocketMessageBrokerConfigurer介面增加攔截器
ChannelRegistration reg = getClientOutboundChannelRegistration();
if (reg.hasInterceptors()) {
channel.setInterceptors(reg.getInterceptors());
}
return channel;
}
不管是請求通道還是響應通道程式碼一模一樣,使用的均是ExecutorSubscribableChannel,其內部整合了攔截器和執行緒池。此類基本是所有channel的公用類,筆者稍微看下里面有什麼小花頭
No.1 訊息處理ExecutorSubscribableChannel
@Override
public boolean sendInternal(Message<?> message, long timeout) {
// 訊息處理者遍歷
for (MessageHandler handler : getSubscribers()) {
// 統一由SendTask類處理訊息
SendTask sendTask = new SendTask(message, handler);
if (this.executor == null) {
sendTask.run();
}
else {
this.executor.execute(sendTask);
}
}
return true;
}
這裡的訊息處理者有直接處理註解的,也有直接返回給BorkerRelay的,讀者可自行去查閱
No.2 訊息任務SendTask
@Override
public void run() {
Message<?> message = this.inputMessage;
try {
// 應用攔截器
message = applyBeforeHandle(message);
if (message == null) {
return;
}
// 通過messageHandler來處理訊息
this.messageHandler.handleMessage(message);
triggerAfterMessageHandled(message, null);
}
catch (Exception ex) {
triggerAfterMessageHandled(message, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
throw new MessageDeliveryException(message, description, ex);
}
catch (Throwable err) {
String description = "Failed to handle " + message + " to " + this + " in " + this.messageHandler;
MessageDeliveryException ex2 = new MessageDeliveryException(message, description, err);
triggerAfterMessageHandled(message, ex2);
throw ex2;
}
}
由此可得出通用的SendTask只是個訊息中轉器,最終的訊息處理均是由MessageHandler來解決的。看來處理訊息的路由核心得繼續往下文分析了
註解方式訊息處理器MessageHandler
即解析@MessageMapping/@SendTo等websocket註解的方法,其類似於MVC的@RequestMapping等註解。可見SimpAnnotationMethodMessageHandler類
@Override
protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {
// 請求和響應通道、broker訊息模板
return new WebSocketAnnotationMethodMessageHandler(
clientInboundChannel(), clientOutboundChannel(), brokerMessagingTemplate());
}
上述程式碼中的broker訊息模板主要通過brokerChannel通道將註解方法返回的值經過訂閱關係處理後再傳入響應通道。筆者此處對WebSocketAnnotationMethodMessageHandler作下分步驟的解析
No.1 入參解析器
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
ApplicationContext context = getApplicationContext();
ConfigurableBeanFactory beanFactory = (context instanceof ConfigurableApplicationContext ?
((ConfigurableApplicationContext) context).getBeanFactory() : null);
List<HandlerMethodArgumentResolver> resolvers = new ArrayList<>();
// @Header和@Headers引數註解解析
resolvers.add(new HeaderMethodArgumentResolver(this.conversionService, beanFactory));
resolvers.add(new HeadersMethodArgumentResolver());
// @DestinationVariable註解引數解析
resolvers.add(new DestinationVariableMethodArgumentResolver(this.conversionService));
// 讀取Principal型別的引數,讀取的為頭部的simpUser屬性
resolvers.add(new PrincipalMethodArgumentResolver());
// 讀取Message型別的引數
resolvers.add(new MessageMethodArgumentResolver(this.messageConverter));
resolvers.addAll(getCustomArgumentResolvers());//使用者自定義,可擴充套件
// @Payload註解的引數解析
resolvers.add(new PayloadArgumentResolver(this.messageConverter, this.validator));
return resolvers;
}
No.2 反參解析器
@Override
protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();
// Single-purpose return value types
handlers.add(new ListenableFutureReturnValueHandler());
handlers.add(new CompletableFutureReturnValueHandler());
// @SendTo和@SendToUser註解解析器
SendToMethodReturnValueHandler sendToHandler =
new SendToMethodReturnValueHandler(this.brokerTemplate, true);
if (this.headerInitializer != null) {
sendToHandler.setHeaderInitializer(this.headerInitializer);
}
handlers.add(sendToHandler);
// @SubscribeMapping註解解析器
SubscriptionMethodReturnValueHandler subscriptionHandler =
new SubscriptionMethodReturnValueHandler(this.clientMessagingTemplate);
subscriptionHandler.setHeaderInitializer(this.headerInitializer);
handlers.add(subscriptionHandler);
// custom return value types
handlers.addAll(getCustomReturnValueHandlers());
// 預設處理
sendToHandler = new SendToMethodReturnValueHandler(this.brokerTemplate, false);
sendToHandler.setHeaderInitializer(this.headerInitializer);
handlers.add(sendToHandler);
return handlers;
}
No.3 HandlerMethod物件建立
@Override
public void afterPropertiesSet() {
// 入參和反參解析器配置
if (this.argumentResolvers.getResolvers().isEmpty()) {
this.argumentResolvers.addResolvers(initArgumentResolvers());
}
if (this.returnValueHandlers.getReturnValueHandlers().isEmpty()) {
this.returnValueHandlers.addHandlers(initReturnValueHandlers());
}
ApplicationContext context = getApplicationContext();
if (context == null) {
return;
}
for (String beanName : context.getBeanNamesForType(Object.class)) {
if (!beanName.startsWith(SCOPED_TARGET_NAME_PREFIX)) {
Class<?> beanType = null;
try {
beanType = context.getType(beanName);
}
catch (Throwable ex) {
// An unresolvable bean type, probably from a lazy bean - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
}
}
// 查詢被@Controller註解下修飾的帶有@MessageMapping和@SubscribeMapping註解的方法集合並存放至handlerMethods對映集合中
if (beanType != null && isHandler(beanType)) {
detectHandlerMethods(beanName);
}
}
}
}
主要是搜尋帶有@MessageMapping和@SubscribeMapping註解的方法註冊至MessageHandler物件中的handlerMethods屬性,方便後續對請求的路由
No.4 請求通道註冊SimpAnnotationMethodMessageHandler處理類
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
// 請求通道注入此處理器
this.clientInboundChannel.subscribe(this);
this.running = true;
}
}
針對註解方式的訊息路由處理我們基本知道了,那這個針對websocket的發過來的請求如何被路由至相應的HandlerMethod中呢?
HandlerMapping
筆者此處找尋下針對websocket的請求的路由
@Bean
public HandlerMapping stompWebSocketHandlerMapping() {
WebSocketHandler handler = decorateWebSocketHandler(subProtocolWebSocketHandler());
WebMvcStompEndpointRegistry registry = new WebMvcStompEndpointRegistry(
handler, getTransportRegistration(), messageBrokerTaskScheduler());
ApplicationContext applicationContext = getApplicationContext();
if (applicationContext != null) {
registry.setApplicationContext(applicationContext);
}
registerStompEndpoints(registry);
// 返回的是AbstractUrlHandlerMapping的繼承類
return registry.getHandlerMapping();
}
具體的讀者可查詢原始碼,內容還是很多的,筆者只知道上述程式碼返回的是AbstractUrlHandlerMapping的繼承類,其儲存的urlMap中的key值為websocket的端點路徑,比如/ws-demo/**
,而value值則是HttpRequestHandler介面的實現類,其主要處理基於HTTP的websocket請求。
@FunctionalInterface
public interface HttpRequestHandler {
/**
* Process the given request, generating a response.
* @param request current HTTP request
* @param response current HTTP response
* @throws ServletException in case of general errors
* @throws IOException in case of I/O errors
*/
void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException;
}
感興趣的讀者可進行深入的研究,其中有包括對ajax/sockJs/handshake等的支援。
訊息流
主要是由AbstractHttpReceivingTransportHandler接收客戶端的請求,然後通過StompSubProtocolHandler類將訊息傳送至ExecutorSubscribableChannel,由其呼叫sendInternal()方法遍歷註冊的MessageHandlers,由後者去真正的處理訊息並回包。具體的程式碼邏輯還是很複雜的,筆者此處羅列下註解方式的處理以及響應給客戶端的訊息處理
No.1 註解訊息處理AbstractMethodMessageHandler
@Override
public void handleMessage(Message<?> message) throws MessagingException {
// 獲取目的地址
String destination = getDestination(message);
if (destination == null) {
return;
}
// 確保請求的發過來的地址是指定的字首,否則訊息就會被直接丟棄
String lookupDestination = getLookupDestination(destination);
if (lookupDestination == null) {
return;
}
MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message);
headerAccessor.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, lookupDestination);
headerAccessor.setLeaveMutable(true);
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Searching methods to handle " +
headerAccessor.getShortLogMessage(message.getPayload()) +
", lookupDestination='" + lookupDestination + "'");
}
// 找尋註解進行相應的方法響應
handleMessageInternal(message, lookupDestination);
headerAccessor.setImmutable();
}
此處需要注意的是請求的路徑字首必須是指定的字首,此字首可通過WebSocketMessageBrokerConfigurer#configureMessageBroker()方法來設定,如下
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// response destination prefix
registry.enableSimpleBroker("/topic");
// request destination prefix
registry.setApplicationDestinationPrefixes("/app");
}
No.2 註解訊息響應處理SimpleBrokerMessageHandler
@Override
protected void handleMessageInternal(Message<?> message) {
MessageHeaders headers = message.getHeaders();
SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
String destination = SimpMessageHeaderAccessor.getDestination(headers);
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
updateSessionReadTime(sessionId);
// 此處確保回包的路徑是以指定的BrokerPath作為字首,否則則會被丟棄,配置同上
if (!checkDestinationPrefix(destination)) {
return;
}
// 針對訊息的傳送,會根據多個訂閱者進行廣播發送
if (SimpMessageType.MESSAGE.equals(messageType)) {
logMessage(message);
sendMessageToSubscribers(destination, message);
}
// 連線請求響應
else if (SimpMessageType.CONNECT.equals(messageType)) {
....
}
}
// 關閉請求響應
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
....
}
// 訂閱請求響應
else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
...
}
// 取消訂閱請求響應
else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
....
}
}
No.2 訊息響應處理StompBrokerRelayMessageHandler,其作為真實的處理響應的出處
@Override
protected void handleMessageInternal(Message<?> message) {
String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
.....
.....
// 回包路徑,預設以使用者設定的BrokerPath為字首;不滿足就將包丟棄
String destination = stompAccessor.getDestination();
if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) {
return;
}
// 連線請求
if (StompCommand.CONNECT.equals(command)) {
if (logger.isDebugEnabled()) {
logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
}
stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
stompAccessor.setLogin(this.clientLogin);
stompAccessor.setPasscode(this.clientPasscode);
if (getVirtualHost() != null) {
stompAccessor.setHost(getVirtualHost());
}
StompConnectionHandler handler = new StompConnectionHandler(sessionId, stompAccessor);
this.connectionHandlers.put(sessionId, handler);
this.stats.incrementConnectCount();
Assert.state(this.tcpClient != null, "No TCP client available");
this.tcpClient.connect(handler);
}
// 關閉請求
else if (StompCommand.DISCONNECT.equals(command)) {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
}
return;
}
stats.incrementDisconnectCount();
handler.forward(message, stompAccessor);
}
else {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) {
if (logger.isDebugEnabled()) {
logger.debug("No TCP connection for session " + sessionId + " in " + message);
}
return;
}
// 直接呼叫連線返回,內含sessionId以及訂閱者id等等
handler.forward(message, stompAccessor);
}
}
小結
先了解websocket的原理,然後再結合原始碼加深對原理的理解,這便是瞭解一個新技術的必要步驟。筆者此處針對官方的例子作以下小貼士 1.配置websocket的請求響應字首以及端點配置,務必實現WebSocketMessageBrokerConfigurer介面
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// register web socket contextPath and allow any origin
registry.addEndpoint("/ws-demo").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// response destination prefix
registry.enableSimpleBroker("/topic");
// request destination prefix
registry.setApplicationDestinationPrefixes("/app");
}
}
2.針對回包處理時,一般我們需要指定路徑,如果採用註解方式,預設情況下@SendTo不指定的時候,會採用使用者設定的回包路徑字首,比如@MessageMapping("/app/hello")-->/topic/hello。 當然使用者也可以採用SimpMessageTemplate#convertAndSend()方法直接傳送至指定的回包路徑
3.客戶端採用sockJs相關API時,其支援通過HTTP/HTTPS協議連線指定的websocket端點,但是務必在訂閱或者傳送訊息的時候,指定的目的地址必須以/
為開頭,否則傳送不成功