1. 程式人生 > 實用技巧 >SpringBoot快速整合RocketMQ

SpringBoot快速整合RocketMQ

RocketMQ總結整理

https://blog.csdn.net/asdf08442a/article/details/54882769

SpringBoot整合RocketMQ

依賴

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>

yml

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: my-group

produce

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestSpringRocketMQ {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void send() {
        /**
         * 同步傳送 sync
         * 傳送訊息採用同步模式,這種方式只有在訊息完全傳送完成之後才返回結果,此方式存在需要同步等待發送結果的時間代價。
         * 這種方式具有內部重試機制,即在主動宣告本次訊息傳送失敗之前,內部實現將重試一定次數,預設為2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。
         * 傳送的結果存在同一個訊息可能被多次傳送給給broker,這裡需要應用的開發者自己在消費端處理冪等性問題。
         */
        SendResult syncSend = rocketMQTemplate.syncSend("my-topic", "syncSend");
        System.out.println(syncSend);
        /**
         * 非同步傳送 async
         * 傳送訊息採用非同步傳送模式,訊息傳送後立刻返回,當訊息完全完成傳送後,會呼叫回撥函式sendCallback來告知傳送者本次傳送是成功或者失敗。
         * 非同步模式通常用於響應時間敏感業務場景,即承受不了同步傳送訊息時等待返回的耗時代價。同同步傳送一樣,非同步模式也在內部實現了重試機制,
         * 預設次數為2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。傳送的結果同樣存在同一個訊息可能被多次傳送給給broker,需要應用的開發者自己在消費端處理冪等性問題。
         */
        rocketMQTemplate.asyncSend("my-topic", "syncSend", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("send successful");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("send fail; {}" + throwable.getMessage());
            }
        });

        /**
         * 直接傳送 one-way
         * 採用one-way傳送模式傳送訊息的時候,傳送端傳送完訊息後會立即返回,不會等待來自broker的ack來告知本次訊息傳送是否完全完成傳送。
         * 這種方式吞吐量很大,但是存在訊息丟失的風險,所以其適用於不重要的訊息傳送,比如日誌收集
         */
        rocketMQTemplate.sendOneWay("my-topic", "sendOneWay");

        rocketMQTemplate.convertAndSend("my-topic", "convertAndSend");
    }
}

consume

@Component
@RocketMQMessageListener(
        topic = "my-topic",
        consumerGroup = "my-group",
        selectorExpression = "*"
)
public class SpringConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("接收到訊息 -> " + msg);
    }
}