1. 程式人生 > 其它 >深入理解RocketMQ延遲訊息

深入理解RocketMQ延遲訊息

本文轉載自:https://cloud.tencent.com/developer/article/1581368

延遲訊息是實際開發中一個非常有用的功能,本文第一部分從整體上介紹秒級精度延遲訊息的實現思路,在第二部分結合RocketMQ的延遲訊息實現,進行細緻的講解,點出關鍵部分的原始碼。第三步介紹延遲訊息與訊息重試的關係。

延遲訊息介紹

基本概念:延遲訊息是指生產者傳送訊息傳送訊息後,不能立刻被消費者消費,需要等待指定的時間後才可以被消費。
場景案例:使用者下了一個訂單之後,需要在指定時間內(例如30分鐘)進行支付,在到期之前可以傳送一個訊息提醒使用者進行支付。
一些訊息中介軟體的Broker端內建了延遲訊息

支援的能力,如:

  • NSQ:這是一個go語言的訊息中介軟體,其通過記憶體中的優先順序佇列來儲存延遲訊息,支援秒級精度,最多2個小時延遲。Java中也有對應的實現,如ScheduledThreadPoolExecutor內部實際上也是使用了優先順序佇列。
  • QMQ:採用雙重時間輪實現。可參考:任意時間延時訊息原理講解:設計與實現
  • RabbitMQ:需要安裝一個rabbitmq_delayed_message_exchange外掛。
  • RocketMQ:RocketMQ 開源版本延遲訊息臨時儲存在一個內部主題中,不支援任意時間精度,支援特定的 level,例如定時 5s,10s,1m 等。

Broker端內建延遲訊息處理能力,核心實現思路都是一樣:將延遲訊息通過一個臨時儲存進行暫存,到期後才投遞到目標Topic中。如下圖所示:

步驟說明如下:

  1. producer要將一個延遲訊息傳送到某個Topic中
  2. Broker判斷這是一個延遲訊息後,將其通過臨時儲存進行暫存。
  3. Broker內部通過一個延遲服務(delay service)檢查訊息是否到期,將到期的訊息投遞到目標Topic中。這個的延遲服務名字為delay service,不同訊息中介軟體的延遲服務模組名稱可能不同。
  4. 消費者消費目標topic中的延遲投遞的訊息

顯然,臨時儲存模組和延遲服務模組,是延遲訊息實現的關鍵。上圖中,臨時儲存和延遲服務都是在Broker內部實現,對業務透明。

此外, 還有一些訊息中介軟體原生並不支援延遲訊息,如Kafka。在這種情況下,可以選擇對Kafka進行改造,但是成本較大。另外一種方式是使用第三方臨時儲存,並加一層代理。

第三方儲存選型要求:
對於第三方臨時儲存,其需要滿足以下幾個特點:

  • 高效能:寫入延遲要低,MQ的一個重要作用是削峰填谷,在選擇臨時儲存時,寫入效能必須要高,關係型資料庫(如Mysql)通常不滿足需求。
  • 高可靠:延遲訊息寫入後,不能丟失,需要進行持久化,並進行備份
  • 支援排序:支援按照某個欄位對訊息進行排序,對於延遲訊息需要按照時間進行排序。普通訊息通常先發送的會被先消費,延遲訊息與普通訊息不同,需要進行排序。例如先發一條延遲10s的訊息,再發一條延遲5s的訊息,那麼後傳送的訊息需要被先消費。
  • 支援長時間儲存:一些業務的延遲訊息,需要延遲幾個月,甚至更長,所以延遲訊息必須能長時間保留。不過通常不建議延遲太長時間,儲存成本比較大,且業務邏輯可能已經發生變化,已經不需要消費這些訊息。

例如,滴滴開源的訊息中介軟體DDMQ,底層訊息中介軟體的基礎上加了一層代理,獨立部署延遲服務模組,使用rocksdb進行臨時儲存。rocksdb是一個高效能的KV儲存,並支援排序。

此時對於延遲訊息的流轉如下圖所示:

說明如下:

  1. 生產者將傳送給producer proxy,proxy判斷是延遲訊息,將其投遞到一個緩衝Topic中;
  2. delay service啟動消費者,用於從緩衝topic中消費延遲訊息,以時間為key,儲存到rocksdb中;
  3. delay service判斷訊息到期後,將其投遞到目標Topic中。
  4. 消費者消費目標topic中的資料

這種方式的好處是,因為delay service的延遲投遞能力是獨立於broker實現的,不需要對broker做任何改造,對於任意MQ型別都可以提供支援延遲訊息的能力。例如DDMQ對RocketMQ、Kafka都提供了秒級精度的延遲訊息投遞能力,但是Kafka本身並不支援延遲訊息,而開源版本的 RocketMQ 只支援幾個指定的延遲級別,並不支援秒級精度的定時訊息。

事實上,DDMQ還提供了很多其他功能,僅僅從延遲訊息的角度,完全沒有必要使用這個proxy,直接將訊息投遞到緩衝Topic中,之後通過delay service完成延遲投遞邏輯即可。

具體到delay service模組的實現上,也有一些重要的細節:

  1. 為了保證服務的高可用,delay service也是需要部署多個節點。
  2. 為了保證資料不丟失,每個delay service節點都需要消費緩衝Topic中的全量資料,儲存到各自的持久化儲存中,這樣就有了多個備份,並需要以時間為key。不過因為是各自拉取,並不能保證強一致。如果一定要強一致,那麼delay service就不需要內建儲存實現,可以藉助於其他支援強一致的儲存。
  3. 為了避免重複投遞,delay service需要進行選主,可以藉助於zookeeper、etcd等實現。只有master可以通過生產者投遞到目標Topic中,其他節點處於備用狀態。否則,如果每個節點進行都投遞,那麼延遲訊息就會被投遞多次,造成消費重複。
  4. master要記錄自己當前投遞到的時間到一個共享儲存中,如果master掛了,從slave節點中選出一個新的master節點,從之前記錄時間繼續開始投遞。
  5. 延遲訊息的取消:一些延遲訊息在未到期之前,可能希望進行取消。通常取消邏輯實現較為複雜,且不夠精確。對於那些已經快要到期的訊息,可能還未取消之前,已經發送出去了,因此需要在消費者端做檢查,才能萬無一失。

RocketMQ中的延遲訊息

開源RocketMQ支援延遲訊息,但是不支援秒級精度。預設支援18個level的延遲訊息,這是通過broker端的messageDelayLevel配置項確定的,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker在啟動時,內部會建立一個內部主題:SCHEDULE_TOPIC_XXXX,根據延遲level的個數,建立對應數量的佇列,也就是說18個level對應了18個佇列。注意,這並不是說這個內部主題只會有18個佇列,因為Broker通常是叢集模式部署的,因此每個節點都有18個佇列。

延遲級別的值可以進行修改,以滿足自己的業務需求,可以修改/新增新的level。例如:你想支援2天的延遲,修改最後一個level的值為2d,這個時候依然是18個level;也可以增加一個2d,這個時候總共就有19個level。

生產者傳送延遲訊息

生產者在傳送延遲訊息非常簡單,只需要設定一個延遲級別即可,注意不是具體的延遲時間,如:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//設定延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);

如果設定的延遲level超過最大值,那麼將會重置最最大值。

Broker端儲存延遲訊息

延遲訊息在RocketMQ Broker端的流轉如下圖所示:

可以看到,總共有6個步驟,下面會對這6個步驟進行詳細的講解:

  1. 修改訊息Topic名稱和佇列資訊
  2. 轉發訊息到延遲主題的CosumeQueue中
  3. 延遲服務消費SCHEDULE_TOPIC_XXXX訊息
  4. 將資訊重新儲存到CommitLog中
  5. 將訊息投遞到目標Topic中
  6. 消費者消費目標topic中的資料

第一步:修改訊息Topic名稱和佇列資訊

RocketMQ Broker端在儲存生產者寫入的訊息時,首先都會將其寫入到CommitLog中。之後根據訊息中的Topic資訊和佇列資訊,將其轉發到目標Topic的指定佇列(ConsumeQueue)中。

由於訊息一旦儲存到ConsumeQueue中,消費者就能消費到,而延遲訊息不能被立即消費,所以這裡將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,並根據延遲級別確定要投遞到哪個佇列下。

同時,還會將訊息原來要傳送到的目標Topic和佇列資訊儲存到訊息的屬性中。相關原始碼如下所示:
org.apache.rocketmq.store.CommitLog#asyncPutMessage

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {

//如果是延遲訊息
            if (msg.getDelayTimeLevel() > 0) {
                //如果設定的級別超過了最大級別 重置延遲級別
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                //修改TOPIC的投遞目標為內部主題SCHEDULE_TOPIC_XXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //根據delayLevel 確定訊息投遞到SCHEDULE_TOPIC_XXX內部的哪個佇列中
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                //記錄原始topic、queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                //更新訊息的topic、queueId
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }

}

第二步:轉發訊息到延遲主題的CosumeQueue中

CommitLog中的訊息轉發到CosumeQueue中是非同步進行的。在轉發過程中,會對延遲訊息進行特殊處理,主要是計算這條延遲訊息需要在什麼時候進行投遞。

投遞時間=訊息儲存時間(storeTimestamp) + 延遲級別對應的時間

需要注意的是,會將計算出的投遞時間當做訊息Tag的雜湊值儲存到CosumeQueue中,CosumeQueue單個儲存單元組成結構如下圖所示:
其中:

  • Commit Log Offset:記錄在CommitLog中的位置。
  • Size:記錄訊息的大小
  • Message Tag HashCode:記錄訊息Tag的雜湊值,用於訊息過濾。特別的,對於延遲訊息,這個欄位記錄的是訊息的投遞時間戳。這也是為什麼java中hashCode方法返回一個int型,只佔用4個位元組,而這裡Message Tag HashCode欄位卻設計成8個位元組的原因。

相關原始碼參見:CommitLog#checkMessageAndReturnSize

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
        final boolean readBody) {

		// Timing message processing
                {
                    //如果訊息需要投遞到延遲主題SCHEDULE_TOPIC_XXX中
                    String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                    if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
                        int delayLevel = Integer.parseInt(t);

                        if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                            delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                        }

                        //如果延遲級別大於0 計算目標投遞時間 並將其當作tag雜湊值
                        if (delayLevel > 0) {
                            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                storeTimestamp);
                        }
                    }
                }

}

第三步:延遲服務消費SCHEDULE_TOPIC_XXXX訊息

Broker內部有一個ScheduleMessageService類,其充當延遲服務,消費SCHEDULE_TOPIC_XXXX中的訊息,並投遞到目標Topic中。

ScheduleMessageService在啟動時,其會建立一個定時器Timer,並根據延遲級別的個數,啟動對應數量的TimerTask,每個TimerTask負責一個延遲級別的消費與投遞。

相關原始碼如下所示:ScheduleMessageService#start

public void start() {
        if (started.compareAndSet(false, true)) {
            super.load();
            //1.建立定時器Timer
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            //2.針對每個延遲級別 建立一個 TimerTask
            //2.1: 迭代每個延遲級別,delayLevelTable是一個Map 記錄了每個延遲級別對應的延遲時間
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                //2.2 針對每個延遲級別 建立一個 TimerTask
                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
}

需要注意的是,每個TimeTask在檢查訊息是否到期時,首先檢查對應佇列中尚未投遞第一條訊息,如果這條訊息沒到期,那麼之後的訊息都不會檢查。如果到期了,則進行投遞,並檢查之後的訊息是否到期。

第四步:將資訊重新儲存到CommitLog中

在將訊息到期後,需要投遞到目標Topic。由於在第一步已經記錄了原來的Topic和佇列資訊,因此這裡重新設定,再儲存到CommitLog即可。此外,由於之前Message Tag HashCode欄位儲存的是訊息的投遞時間,這裡需要重新計算tag的雜湊值後再儲存。

原始碼參見:DeliverDelayedMessageTimerTask的messageTimeup方法。

第五步:將訊息投遞到目標Topic中

這一步與第二步類似,不過由於訊息的Topic名稱已經改為了目標Topic。因此訊息會直接投遞到目標Topic的ConsumeQueue中,之後消費者即消費到這條訊息。

延遲訊息與消費重試的關係

RocketMQ提供了訊息重試的能力,在併發模式消費消費失敗的情況下,可以返回一個列舉值RECONSUME_LATER,那麼訊息之後將會進行重試。如:

consumer.registerMessageListener(new MessageListenerConcurrently() {
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                       ConsumeConcurrentlyContext context) {
           //處理訊息,失敗,返回RECONSUME_LATER,進行重試
           return ConsumeConcurrentlyStatus.RECONSUME_LATER;
       }
   });

重試預設會進行重試16次。使用過RocketMQ訊息重試功能的使用者,可能看到過以下這張圖:

第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間
1 10 秒 9 7 分鐘
2 30 秒 10 8 分鐘
3 1 分鐘 11 9 分鐘
4 2 分鐘 12 10 分鐘
5 3 分鐘 13 20 分鐘
6 4 分鐘 14 30 分鐘
7 5 分鐘 15 1 小時
8 6 分鐘 16 2 小時

細心地的讀者發現了,訊息重試的16個級別,實際上是把延遲訊息18個級別的前兩個level去掉了。事實上,RocketMQ的訊息重試也是基於延遲訊息來完成的。在訊息消費失敗的情況下,將其重新當做延遲訊息投遞迴Broker。

在投遞回去時,會跳過前兩個level,因此只重試16次。當然,訊息重試還有一些其他的設計邏輯,在之後的文章將會進行分析。