使用Spring Cloud Stream 驅動 RabbitMQ 程式碼示例
阿新 • • 發佈:2022-12-07
1、Spring Cloud Stream 官方文件
官方配置文件參考:
Spring Cloud Stream Reference Documentation
Spring Cloud Stream RabbitMQ Binder Reference Guide
說明:
在網上查找了許多關於 SpringCloudStream 的中文配置文件,但多數文件更新不及時。而且 SpringCLoudStream 最近的版本在使用上變化較大,老版本的資料已經失去了參考價值。
官方文件雖然是英文的,對多數國內的開發人員閱讀會有些吃力,但好在資料都是最新的,而且現在的翻譯軟體這麼方便,國內的多數開發人員也是有一些英文的閱讀基礎的,建議還是參考官方文件來測試。
特別說明:
文件中的配置說明中,基本上都是 駝峰命名,但我們在實際的使用過程中,在 yaml 配置檔案中,都是使用 - 來分割使用。
比如:文件中的 配置名為 acknowledgeMode
但在 yaml 中 acknowledge-mode
2、示例程式碼
2.1 application.yml
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin virtual-host: / cloud: stream: binders: # 配置 Binder defaultRabbit: type: rabbit environment: spring: rabbitmq: host: 127.0.0.1 default: binder: defaultRabbit consumer: max-attempts: 1 # 重試次數, 設定為 1 即不重試 bindings: flowEndConsumer-in-0: group: queue # queue destination: dap-oa.flowEnd # exchange flowEndProducer-out-0: destination: dap-oa.flowEnd rabbit: # default: # acknowledge-mode: manual # manual 手動確認 # dlq-ttl: 5000 bindings: flowEndConsumer-in-0: consumer: binding-routing-key: flowEnd # 路由鍵 exchange-type: topic # 交換機型別 # dlq-ttl: 5000 # 訊息過期時間 # acknowledge-mode: manual # manual 手動確認 預設為 auto 自動確認 flowEndProducer-out-0: producer: exchange-type: topic routing-key-expression: headers.routingKey # 根據 headers 中的 routingKey 引數,來確認路由到哪個 queue , "routingKey" 可自己命名 function: definition: flowEndProducer;flowEndConsumer # 生產者和消費者必須在此配置,是能被自動識別
此配置內容,會在 rabbitMQ 中生成:
- exchange : dap-oa.flowEnd
- queue : dap-oa.flowEnd.queue
- dap-oa.flowEnd.queue 與 dap-oa.flowEnd 繫結的路由鍵:flowEnd
交換機 exchange:
佇列 queue:
2.2 生產者
方式一:
@Slf4j @Configuration public class ProducerDemo { @Bean public Supplier<Message<String>> flowEndProducer() { return new Supplier<Message<String>>() { @Override public Message<MsgData> get() { return MessageBuilder.withPayload("測試內容") .setHeader("routingKey", "flowEnd") // 路由 .build(); } }; } }
方式二:
// 直接通過 StreamBridge 呼叫
private final StreamBridge streamBridge;
public void send() {
streamBridge.send(RabbitConstant.FLOW_END_PRODUCER,
MessageBuilder.withPayload("測試內容")
.setHeader("routingKey", "flowEnd")
.build());
}
2.2 消費者
@Slf4j
@Configuration
public class ConsumerDemo {
@Bean
public Consumer<Message<String>> flowEndConsumer() {
return message -> {
System.out.println("******************");
System.out.println("At flowEndConsumer");
System.out.println("******************");
System.out.println("Received message " + message.getPayload());
Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
log.info("******************");
log.info("channel:{}", channel);
log.info("deliveryTag:{}", deliveryTag);
};
}
}