Stream進階篇-動態繫結訊息通道
阿新 • • 發佈:2019-02-02
前言
在之前的章節中,所有消費者和生產者均通過@EnableBinding定義,此方式能夠快速的構建生產消費關係,但仔細想想,如果我們需要根據一定的條件決策訊息生產者將訊息發往哪個通道,貌似當前簡單粗暴的方式無法滿足。如此常見的場景,springcloud必然會幫我們想到,通過BinderAwareChannelResolver的bean例項即可實現動態通道的選擇,其會伴隨@EnableBinding註解自動完成註冊。
本章概要
1、BinderAwareChannelResolver的應用;
2、ExpressionEvaluatingRouter的應用;
BinderAwareChannelResolver的應用
首先來看BinderAwareChannelResolver的直接應用,為了方便場景模擬,採用一個rest api方式觸發訊息的生產傳送。
消費者Receiver工程改造
1、在MySink中新增如下兩個動態接收通道,dynamic1-channel與dynamic1-channel:
此時可以看到receiver工程的控制檯列印如下
其列印的來源通道與請求中的佔位符完全匹配,繼續觀察sender控制檯日誌,由於原來並沒有在sernder中定義相關通道描述,故首次觸發指定通道即可看到如下日誌記錄: 小節,由此可以看到,根據佔位符dest動態路由成功,準確的被髮送至預期的訊息通道。實際應用中,如果我們預先知道可能的動態路由通道名稱,則可以通過spring.cloud.stream.dynamicDestinations配置白名單,只有預設定的通道名稱方會被動態繫結,避免建立大量無效的通道資訊,浪費資源。 ExpressionEvaluatingRouter的應用 通過下圖可以BinderAwareChannelResolver類的定義, 其實現了Spring Integration的DestinationResolver介面,並且BinderAwareChannelResolver例項可以被注入在其他的components例項中,本小節將實現將BinderAwareChannelResolver例項注入在ExpressionEvaluatingRouter中實現訊息通道的動態繫結。 1、在DynamicDestinationController類中新增如下實現:
receiver工程控制檯如下: 此時可以看到,接收的訊息來源通道與請求頭呼應,特別關注第5個case,其並未設定dest屬性,故採用了預設的dynamic1-channel通道。
2、在SinkReceiver.class中新增對上述兩個通道的監聽,並列印接收內容:package com.cloud.shf.stream.sink; public interface MySink { /*********************************動態通道選擇示例******************************/ String DYNAMIC1_CHANNEL = "dynamic1-channel"; String DYNAMIC2_CHANNEL = "dynamic2-channel"; @Input(DYNAMIC1_CHANNEL) SubscribableChannel dynamic1Input(); @Input(DYNAMIC2_CHANNEL) SubscribableChannel dynamic2Input(); }
生產者Sender工程改造 新增一個DynamicDestinationController類,提供一個rest-api協助進行場景模擬:/*********************************動態通道選擇示例******************************/ @StreamListener(value = MySink.DYNAMIC1_CHANNEL) public void dynamic1Receiver(@Payload User user) { LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC1_CHANNEL, user.getAge()); } @StreamListener(value = MySink.DYNAMIC2_CHANNEL) public void dynamic2Receiver(@Payload User user) { LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC2_CHANNEL, user.getAge()); }
package com.cloud.shf.stream.controller;
@EnableBinding
@Controller
public class DynamicDestinationController {
@Autowired
private BinderAwareChannelResolver resolver;
/************************************方式一************************************/
@RequestMapping(path = "/{dest}", method = RequestMethod.POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@PathVariable("dest") String dest,
@RequestBody String body,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, dest, contentType);
}
private void sendMessage(String body, String dest, Object contentType) {
resolver.resolveDestination(dest).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
Note:
- 直接注入BinderAwareChannelResolver的bean例項即可;
- 通過PathVariable屬性dest值模擬通道名稱;
- boby作為訊息體;
- contentType作為訊息的頭資訊;
此時可以看到receiver工程的控制檯列印如下
其列印的來源通道與請求中的佔位符完全匹配,繼續觀察sender控制檯日誌,由於原來並沒有在sernder中定義相關通道描述,故首次觸發指定通道即可看到如下日誌記錄: 小節,由此可以看到,根據佔位符dest動態路由成功,準確的被髮送至預期的訊息通道。實際應用中,如果我們預先知道可能的動態路由通道名稱,則可以通過spring.cloud.stream.dynamicDestinations配置白名單,只有預設定的通道名稱方會被動態繫結,避免建立大量無效的通道資訊,浪費資源。 ExpressionEvaluatingRouter的應用 通過下圖可以BinderAwareChannelResolver類的定義, 其實現了Spring Integration的DestinationResolver介面,並且BinderAwareChannelResolver例項可以被注入在其他的components例項中,本小節將實現將BinderAwareChannelResolver例項注入在ExpressionEvaluatingRouter中實現訊息通道的動態繫結。 1、在DynamicDestinationController類中新增如下實現:
/************************************方式二************************************/
@RequestMapping(path = "/", method = RequestMethod.POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody User body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType, @RequestHeader(name = "dest", required = false) String dest) {
Map<String, Object> headers = new HashMap<>(2);
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
if (!StringUtils.isEmpty(dest)) {
headers.put("dest", dest);
}
sendMessage(body, headers);
}
private void sendMessage(User body, Map<String, Object> headers) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(headers)));
}
@Bean(name = "router-channel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "router-channel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("headers[dest]"));
//作用於通過spel表示式沒有獲取到對應的通道資訊
router.setDefaultOutputChannelName("dynamic1-channel");
router.setChannelResolver(resolver);
return router;
}
Note:
- 通過頭資訊中的dest屬性作為動態繫結的依據;如果未設定dest則採用預設dynamic1-channel作為訊息通道;
- 通過Spel表示式獲取頭資訊中的dest屬性值(headers[dest]);
- 將BinderAwareChannelResolver注入至ExpressionEvaluatingRouter例項中;
- 其中org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor實現了對Message的處理,故可以通過此處看到我們訊息包含的訊息體、訊息體具體資訊,從而更好的編寫Spel表示式,主要程式碼如下:
receiver工程控制檯如下: 此時可以看到,接收的訊息來源通道與請求頭呼應,特別關注第5個case,其並未設定dest屬性,故採用了預設的dynamic1-channel通道。