1. 程式人生 > 其它 >使用Spring Cloud Stream 驅動 RabbitMQ 程式碼示例

使用Spring Cloud Stream 驅動 RabbitMQ 程式碼示例

1、Spring Cloud Stream 官方文件

官方配置文件參考:

Spring Cloud Stream Reference Documentation

Spring Cloud Stream RabbitMQ Binder Reference Guide

說明:

在網上查找了許多關於 SpringCloudStream 的中文配置文件,但多數文件更新不及時。而且 SpringCLoudStream 最近的版本在使用上變化較大,老版本的資料已經失去了參考價值。

官方文件雖然是英文的,對多數國內的開發人員閱讀會有些吃力,但好在資料都是最新的,而且現在的翻譯軟體這麼方便,國內的多數開發人員也是有一些英文的閱讀基礎的,建議還是參考官方文件來測試。

特別說明:

文件中的配置說明中,基本上都是 駝峰命名,但我們在實際的使用過程中,在 yaml 配置檔案中,都是使用 - 來分割使用。

比如:文件中的 配置名為 acknowledgeMode

但在 yaml 中 acknowledge-mode

2、示例程式碼

2.1 application.yml

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: /
  cloud:
    stream:
      binders:
        # 配置 Binder
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
      default:
        binder: defaultRabbit
        consumer:
          max-attempts: 1  # 重試次數, 設定為 1 即不重試
      bindings:
        flowEndConsumer-in-0:
          group: queue  # queue
          destination: dap-oa.flowEnd  # exchange
        flowEndProducer-out-0:
          destination: dap-oa.flowEnd
      rabbit:
        #        default:
        #          acknowledge-mode: manual  # manual 手動確認
        #          dlq-ttl: 5000
        bindings:
          flowEndConsumer-in-0:
            consumer:
              binding-routing-key: flowEnd # 路由鍵
              exchange-type: topic  # 交換機型別
              # dlq-ttl: 5000 # 訊息過期時間
              # acknowledge-mode: manual  # manual 手動確認  預設為 auto 自動確認
          flowEndProducer-out-0:
            producer:
              exchange-type: topic
              routing-key-expression: headers.routingKey # 根據 headers 中的 routingKey 引數,來確認路由到哪個 queue , "routingKey" 可自己命名
    function:
      definition: flowEndProducer;flowEndConsumer # 生產者和消費者必須在此配置,是能被自動識別

此配置內容,會在 rabbitMQ 中生成:

  • exchange : dap-oa.flowEnd
  • queue : dap-oa.flowEnd.queue
  • dap-oa.flowEnd.queue 與 dap-oa.flowEnd 繫結的路由鍵:flowEnd

交換機 exchange:

佇列 queue:

2.2 生產者

方式一:

@Slf4j
@Configuration
public class ProducerDemo {

    @Bean
    public Supplier<Message<String>> flowEndProducer() {
        return new Supplier<Message<String>>() {
            @Override
            public Message<MsgData> get() {
                return MessageBuilder.withPayload("測試內容")
                .setHeader("routingKey", "flowEnd")     // 路由
                .build();
            }
        };
    }
}

方式二:

// 直接通過 StreamBridge 呼叫

private final StreamBridge streamBridge;

public void send() {
    streamBridge.send(RabbitConstant.FLOW_END_PRODUCER,
    MessageBuilder.withPayload("測試內容")
        .setHeader("routingKey", "flowEnd")
        .build());
}

2.2 消費者

@Slf4j
@Configuration
public class ConsumerDemo {

    @Bean
    public Consumer<Message<String>> flowEndConsumer() {
        return message -> {
            System.out.println("******************");
            System.out.println("At flowEndConsumer");
            System.out.println("******************");
            System.out.println("Received message " + message.getPayload());

            Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
            Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);

            log.info("******************");
            log.info("channel:{}", channel);
            log.info("deliveryTag:{}", deliveryTag);
        };
    }
}