1. 程式人生 > >RocketMQ(6)---傳送普通訊息(三種方式)

RocketMQ(6)---傳送普通訊息(三種方式)

傳送普通訊息(三種方式)

RocketMQ 傳送普通訊息有三種實現方式:可靠同步傳送可靠非同步傳送單向(Oneway)傳送

注意 :順序訊息只支援可靠同步傳送。

GitHub地址: https://github.com/yudiandemingzi/SpringBootBlog

一、概念

1、可靠同步傳送

原理:同步傳送是指訊息傳送方發出資料後,會在收到接收方發回響應之後才發下一個數據包的通訊方式。

應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名簡訊通知、營銷簡訊系統等。

2、可靠非同步傳送

原理:非同步傳送是指傳送方發出資料後,不等接收方發回響應,接著傳送下個數據包的通訊方式。 訊息佇列 RocketMQ 的非同步傳送,需要使用者實現非同步傳送回撥介面(SendCallback)。

應用場景:非同步傳送一般用於鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如批量發貨等操作。

3、單向(Oneway)傳送

原理:單向(Oneway)傳送特點為傳送方只負責傳送訊息,不等待伺服器迴應且沒有回撥函式觸發,即只發送請求不等待應答。 此方式傳送訊息的過程耗時非常短,一般在微秒級別。

應用場景:適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日誌收集。

4、三種對比

下表概括了三者的特點和主要區別。

傳送方式 傳送 TPS 傳送結果反饋 可靠性
同步傳送 不丟失
非同步傳送 不丟失
單向傳送 最快 可能丟失


二、程式碼示例

1、三種方式程式碼示例

@Slf4j
@RestController
public class Controller {
    /**
     * 生產者組
     */
    private static String PRODUCE_RGROUP = "test_producer";
    /**
     * 建立生產者物件
     */
    private static DefaultMQProducer producer = null;

    static {
        producer = new DefaultMQProducer(PRODUCE_RGROUP);
        //不開啟vip通道 開通口埠會減2
        producer.setVipChannelEnabled(false);
        //繫結name server
        producer.setNamesrvAddr("47.99.03.25:9876");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    @GetMapping("/message")
    public  void  message() throws Exception {
        //1、同步
        sync();
        //2、非同步
        async();
        //3、單項傳送
        oneWay();
    }
    /**
     * 1、同步傳送訊息
     */
    private  void sync() throws Exception {
        //建立訊息
        Message message = new Message("topic_family", ("  同步傳送  ").getBytes());
        //同步傳送訊息
        SendResult sendResult = producer.send(message);
        log.info("Product-同步傳送-Product資訊={}", sendResult);
    }
    /**
     * 2、非同步傳送訊息
     */
    private  void async() throws Exception {
        //建立訊息
        Message message = new Message("topic_family", ("  非同步傳送  ").getBytes());
        //非同步傳送訊息
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Product-非同步傳送-輸出資訊={}", sendResult);
            }
            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                //補償機制,根據業務情況進行使用,看是否進行重試
            }
        });
    }
    /**
     * 3、單項傳送訊息
     */
    private  void oneWay() throws Exception {
        //建立訊息
        Message message = new Message("topic_family", (" 單項傳送 ").getBytes());
        //同步傳送訊息
        producer.sendOneway(message);
    }
}

2、測試結果

這裡消費者程式碼就不貼出來了。

通過這個很明顯可以看出三種方式都被 Consumer 消費了。只不過對於 Product 同步和非同步傳送是有返回資訊的,單項傳送是沒有返回資訊的。


參考

1、RocketMQ 阿里雲官網文件



只要自己變優秀了,其他的事情才會跟著好起來(上將3)