SpringCloud Stream整合RabbitMQ3.5.0
阿新 • • 發佈:2020-11-02
前言
本文章為單體專案,將消費者和生產者寫在同一個專案中,介意者不用向下看了。
本文介紹三種應用方式:
1:普通整合RabbitMQ
2:訊息分割槽
3:按條件消費(多個消費者只消費同一佇列中滿足自己條件的訊息)
1:核心依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> <version>${spring.cloud.stream}</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>${spring.cloud.stream}</version> </dependency>
全部依賴:
專案結構圖:
2:普通整合RabbitMQ
2.1:application.properties
spring.rabbitmq.host=192.168.1.218 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.cloud.stream.bindings.dev-exchange.destination=dev-exchange spring.cloud.stream.bindings.dev-exchange.group=dev-queue spring.cloud.stream.bindings.dev-exchange.content-type=application/json spring.cloud.stream.bindings.dev-exchange.consumer.concurrency=1 spring.cloud.stream.bindings.dev-exchange.consumer.max-attempts=1
2.2:定義生產者和消費者介面
import com.boot.rabbitmq.constance.MQConstants; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface RabbitStream { /** * 訊息流入(消費) **/ @Input(MQConstants.DEV_EXCHANGE) SubscribableChannel devConsumer(); /** * 訊息流出(生產) **/ @Output(MQConstants.DEV_EXCHANGE) MessageChannel devProducer(); }
2.3:生產者/消費者的具體實現
生產者程式碼:
@Component
@EnableBinding(RabbitStream.class)
public class DevProducer {
private static final Logger logger = LoggerFactory.getLogger(DevProducer.class);
private final RabbitStream rabbitStream;
public DevProducer(RabbitStream rabbitStream) {
this.rabbitStream = rabbitStream;
}
public void sendMsg(MQModel model) {
logger.info("producer:{}", JSON.toJSONString(model));
rabbitStream.devProducer()
.send(MessageBuilder.withPayload(model).build());
}
}
消費者程式碼:
@Component
@EnableBinding(RabbitStream.class)
public class DevListener {
private static final Logger logger = LoggerFactory.getLogger(DevListener.class);
@StreamListener(MQConstants.DEV_EXCHANGE)
public void receiveMsgAutoCommit(@Payload String payload) {
logger.info("consumer:{}", payload);
}
}
controller程式碼:
@PostMapping(value = "/dev")
public void dev(@RequestBody MQModel model) {
devProducer.sendMsg(model);
}
2.4:測試
傳送請求:
控制檯日誌:
3:訊息分割槽
3.1:概念
所謂訊息分割槽就是將一個大佇列拆分成多個小佇列,然後分解成 producer-A -> queue-A -> Consumer-A 的一種場景。
3.2:如何在專案中使用
1:不需要改很多東西,只需要新增少部分配置即可
## RabbitMQ 訊息分割槽配置
spring.cloud.stream.bindings.partition-exchange.destination=partition-exchange
spring.cloud.stream.bindings.partition-exchange.group=partition-queue
spring.cloud.stream.bindings.partition-exchange.content-type=application/json
spring.cloud.stream.bindings.partition-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.partition-exchange.consumer.max-attempts=1
## 訊息分割槽
spring.cloud.stream.bindings.partition-exchange.consumer.partitioned=true
## 分割槽數量
spring.cloud.stream.bindings.partition-exchange.producer.partition-count=2
## 機器下標,最大值=partition-count-1
spring.cloud.stream.instance-index=0
## 分割槽策略表示式
spring.cloud.stream.bindings.partition-exchange.producer.partition-key-expression=payload.mid
然後訊息的路由的時候會從payload拿到mid進行條件運算:
mid/2=1則放在應用佇列下標為1的佇列,mid/2=0則放在佇列下標為0的佇列。
訊息的入隊前會計算出該訊息應該進入哪個佇列,原始碼截圖:
可以看到開啟分割槽之後,payload 的型別不是String,而是具備鍵值對的實體物件。
4:條件消費
4.1:概念
對同一個佇列中的訊息按條件進行劃分再派發給不同的消費者。
4.2:匹配條件講解
訊息實體:
除了可以用payload中的資料進行匹配條件外,headers中的資料也可以作為條件。
4.3:測試
效果