Spring Cloud Stream同一通道根據消息內容分發不同的消費邏輯
應用場景
有的時候,我們對於同一通道中的消息處理,會通過判斷頭信息或者消息內容來做一些差異化處理,比如:可能在消息頭信息中帶入消息版本號,然後通過if判斷來執行不同的處理邏輯,其代碼結構可能是這樣的:
@StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(version)) { // Version 1.0 } if("2.0".equals(version)) { // Version 2.0 } }
那麽當消息處理邏輯復雜的時候,這段邏輯就會變得特別復雜。針對這個問題,在@StreamListener註解中提供了一個不錯的屬性condition,可以用來優化這樣的處理結構。
動手試試
下面通過編寫一個簡單的例子來具體體會一下這個屬性的用法:
@EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生產接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build()); testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build()); return "ok"; } } /** * 消息消費邏輯 */ @Slf4j @Component static class TestListener { @StreamListener(value = TestTopic.INPUT, condition = "headers[‘version‘]==‘1.0‘") public void receiveV1(String payload, @Header("version") String version) { log.info("Received v1 : " + payload + ", " + version); } @StreamListener(value = TestTopic.INPUT, condition = "headers[‘version‘]==‘2.0‘") public void receiveV2(String payload, @Header("version") String version) { log.info("Received v2 : " + payload + ", " + version); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); } }
內容很簡單,既包含了消息的生產,也包含了消息消費。在/sendMessage接口的定義中,發送了兩條消息,一條消息的頭信息中包含version=1.0,另外一條消息的頭信息中包含version=2.0。在消息監聽類TestListener中,對TestTopic.INPUT通道定義了兩個@StreamListener,這兩個監聽邏輯有不同的condition,這裏的表達式表示會根據消息頭信息中的version值來做不同的處理邏輯分發。
在啟動應用之前,還要記得配置一下輸入輸出通道對應的物理目標(exchange或topic名),比如:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic spring.cloud.stream.bindings.example-topic-input.group=stream-content-route spring.cloud.stream.bindings.example-topic-output.destination=test-topic
完成了上面配置之後,就可以啟動應用,並嘗試訪問localhost:8080/sendMessage?message=hello接口來發送一個消息到MQ中了。此時可以看到類似下面的日誌:
2018-12-24 15:50:33.361 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v1 : hello, 1.0
2018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0
從日誌中可以看到,兩條帶有不同頭信息的消息,分別通過不同的監聽處理邏輯輸出了對應的日誌打印。
本文首發:http://blog.didispace.com/spring-cloud-starter-finchley-7-6/
原文鏈接:https://my.oschina.net/didispace/blog/2994382
Spring Cloud Stream同一通道根據消息內容分發不同的消費邏輯