1. 程式人生 > 其它 >spring cloud 實現pdf_Spring Cloud Stream實現訊息過濾消費

spring cloud 實現pdf_Spring Cloud Stream實現訊息過濾消費

技術標籤:spring cloud 實現pdf

點選上方"IT牧場",選擇"設為星標"技術乾貨每日送達!

TIPS

本文基於Spring Cloud Greenwich SR1+spring-cloud-starter-stream-rocketmq 0.9.0

理論相容:Spring Cloud Finchley++spring-cloud-starter-stream-rocketmq 0.2.2+

MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。

本文探討Spring Cloud Stream & RocketMQ過濾訊息的各種姿勢。

在實際專案中,我們可能需要實現訊息消費的過濾。

舉個例子:實現訊息的分流處理:

生產者生產的訊息,雖然訊息體可能一樣,但是header不一樣。可編寫兩個或者更多的消費者,對不同header的訊息做針對性的處理!

condition

生產者

生產者設定一下header,比如my-header,值根據你的需要填寫:

@Autowiredprivate Source source;public String testStream() {  this.source.output()    .send(    MessageBuilder    .withPayload("訊息體")    .setHeader("my-header","你的header")    .build()  );  return "success";}

消費者

@[email protected] class TestStreamConsumer {    @StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")    public void receive(String messageBody) {        log.info("通過stream收到了訊息:messageBody ={}", messageBody);    }}

如程式碼所示,使用StreamListener註解的condition

屬性。當headers['my-header']=='你的header'條件滿足,才會進入到方法體。

Tags

TIPS

該方式只支援RoketMQ,不支援Kafka/RabbitMQ

生產者

@Autowiredprivate Source source;public String testStream() {  this.source.output()    .send(    MessageBuilder    .withPayload("訊息體")    // 注意:只能設定1個tag    .setHeader(RocketMQHeaders.TAGS, "tag1")    .build()  );  return "success";}

消費者

1 介面

public interface MySink {    String INPUT1 = "input1";    String INPUT2 = "input2";    @Input(INPUT1)    SubscribableChannel input();    @Input(INPUT2)    SubscribableChannel input2();}

2 註解

@EnableBinding({MySink.class})

3 配置

spring:  cloud:    stream:      rocketmq:        binder:          name-server: 127.0.0.1:9876        bindings:          input1:            consumer:              # 表示input2消費帶有tag1的訊息              tags: tag1          input2:            consumer:              # 表示input2消費帶有tag2或者tag3的訊息              tags: tag2||tag3      bindings:        input1:          destination: test-topic          group: test-group1        input2:          destination: test-topic          group: test-group2

4 消費程式碼

@[email protected] class MyTestStreamConsumer {    /**     * 我消費帶有tag1的訊息     *     * @param messageBody 訊息體     */    @StreamListener(MySink.INPUT1)    public void receive1(String messageBody) {        log.info("帶有tag1的訊息被消費了:messageBody ={}", messageBody);    }    /**     * 我消費帶有tag1或者tag2的訊息     *     * @param messageBody 訊息體     */    @StreamListener(MySink.INPUT2)    public void receive2(String messageBody) {        log.info("帶有tag2/tag3的訊息被消費了:messageBody ={}", messageBody);    }}

5 日誌:

2019-08-04 19:10:03.799  INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : 帶有tag1的訊息被消費了:messageBody =訊息體

Sql 92

TIPS

•該方式只支援RoketMQ,不支援Kafka/RabbitMQ•用了sql,就不要用Tag

RocketMQ支援使用SQL語法過濾訊息。官方文件:http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/

Spring Clous Stream RocketMQ也為此特性提供了支援。

開啟SQL 92支援

預設情況下,RocketMQ的SQL過濾支援是關閉的,要想使用SQL 92過濾訊息,需要:

1 在conf/broker.conf新增

enablePropertyFilter = true

2 啟動RocketMQ

nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

生產者

@Autowiredprivate Source source;public String testStream() {  this.source.output()    .send(    MessageBuilder    .withPayload("訊息體")    .setHeader("index", 1000)    .build()  );  return "success";}

消費者

1 介面

public interface MySink {    String INPUT1 = "input1";    String INPUT2 = "input2";    @Input(INPUT1)    SubscribableChannel input();    @Input(INPUT2)    SubscribableChannel input2();}

2 註解

@EnableBinding({MySink.class})

3 配置

spring:  cloud:    stream:      rocketmq:        binder:          name-server: 127.0.0.1:9876        bindings:          input1:            consumer:              sql: 'index < 1000'          input2:            consumer:              sql: 'index >= 1000'      bindings:        input1:          destination: test-topic          group: test-group1        input2:          destination: test-topic          group: test-group2

4 消費程式碼

@[email protected] class MyTestStreamConsumer {    /**     * 我消費帶有tag1的訊息     *     * @param messageBody 訊息體     */    @StreamListener(MySink.INPUT1)    public void receive1(String messageBody) {        log.info("index > 1000的訊息被消費了:messageBody ={}", messageBody);    }    /**     * 我消費帶有tag1或者tag2的訊息     *     * @param messageBody 訊息體     */    @StreamListener(MySink.INPUT2)    public void receive2(String messageBody) {        log.info("index <=1000 的訊息被消費了:messageBody ={}", messageBody);    }}

5 日誌

2019-08-04 19:58:59.787  INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : index <=1000 的訊息被消費了:messageBody =訊息體

相關程式碼

org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties

參考文件

•Filter Messages By SQL92 In RocketMQ[1]•RocketMQ 錯誤:The broker does not support consumer to filter message by SQL92[2]

乾貨分享

最近將個人學習筆記整理成冊,使用PDF分享。關注我,回覆如下程式碼,即可獲得百度盤地址,無套路領取!

•001:《Java併發與高併發解決方案》學習筆記;•002:《深入JVM核心——原理、診斷與優化》學習筆記;•003:《Java面試寶典》•004:《Docker開源書》•005:《Kubernetes開源書》•006:《DDD速成(領域驅動設計速成)》•007:全部•008:加技術討論群

近期熱文

•優秀的程式碼都是如何分層的?•手把手:Java記憶體洩漏分析Memory Analyzer Tool•分享:手把手教你如何免費且光榮地使用正版IntelliJ IDEA•網易雲音樂的訊息佇列改造之路•分庫分表?如何做到永不遷移資料和避免熱點?•咱們從頭到尾說一次 Java 垃圾回收

References

[1]Filter Messages By SQL92 In RocketMQ:http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/[2]RocketMQ 錯誤:The broker does not support consumer to filter message by SQL92:https://blog.csdn.net/u010690828/article/details/84337688


想知道更多?長按/掃碼關注我吧↓↓↓7cd0098559383b44964813bc030d918d.png>>>技術討論群<<<喜歡就點個"在看"唄^_^