1. 程式人生 > 其它 >RocketMQ如何保證訊息的可靠性投遞?

RocketMQ如何保證訊息的可靠性投遞?

要想保證訊息的可靠型投遞,無非保證如下3個階段的正常執行即可:

  1. 生產者將訊息成功投遞到broker
  2. broker將投遞過程的訊息持久化下來
  3. 消費者能從broker消費到訊息

傳送端訊息重試

roducer向broker傳送訊息後,沒有收到broker的ack時,rocketmq會自動重試。重試的次數可以設定,預設為2次

DefaultMQProducer producer = new DefaultMQProducer(RPODUCER_GROUP_NAME);
// 同步傳送設定重試次數為5次
producer.setRetryTimesWhenSendFailed(5);
// 非同步傳送設定重試次數為5次
producer.setRetryTimesWhenSendAsyncFailed(5);

訊息持久化

我們先來了解一下訊息的儲存流程,這個知識對後面分析消費端訊息重試非常重要。

和訊息相關的檔案有如下幾種:

  • CommitLog:儲存訊息的元資料
  • ConsumerQueue:儲存訊息在CommitLog的索引
  • IndexFile:可以通過Message Key,時間區間快速查詢到訊息

整個訊息的儲存流程如下:

  1. Producer將訊息順序寫到CommitLog中
  2. 有一個執行緒根據訊息的佇列資訊,寫入到相關的ConsumerQueue中(minOffset為寫入的初始位置,consumerOffset為當前消費到的位置,maxOffset為ConsumerQueue最新寫入的位置)和IndexFile
  3. Consumer從ConsumerQueue的consumerOffset讀取到當前應該消費的訊息在CommitLog中的偏移量,到CommitLog中找到對應的訊息,消費成功後移動consumerOffset

刷盤機制

  • 「非同步刷盤」:訊息被寫入記憶體的PAGECACHE,返回寫成功狀態,當記憶體裡的訊息量積累到一定程度時,統一觸發寫磁碟操作,快速寫入 。吞吐量高,當磁碟損壞時,會丟失訊息
  • 「同步刷盤」:訊息寫入記憶體的PAGECACHE後,立刻通知刷盤執行緒刷盤,然後等待刷盤完成,刷盤執行緒執行完成後喚醒等待的執行緒,給應用返回訊息寫成功的狀態。吞吐量低,但不會造成訊息丟失

主從複製

如果一個broker有master和slave時,就需要將master上的訊息複製到slave上,複製的方式有兩種:

  • 「同步複製」:master和slave均寫成功,才返回客戶端成功。maste掛了以後可以保證資料不丟失,但是同步複製會增加資料寫入延遲,降低吞吐量
  • 「非同步複製」:master寫成功,返回客戶端成功。擁有較低的延遲和較高的吞吐量,但是當master出現故障後,有可能造成資料丟失

消費端訊息重試

順序訊息的重試

對於順序訊息,當消費者消費訊息失敗後,訊息佇列RocketMQ版會自動不斷地進行訊息重試(每次間隔時間為1秒),這時,應用會出現訊息消費被阻塞的情況。所以一定要做好監控,避免阻塞現象的發生。

順序訊息消費失敗後不會消費下一條訊息而是不斷重試這條訊息,應該是考慮到如果跨過這條訊息消費後面的訊息會對業務邏輯產生影響
順序訊息暫時僅支援叢集消費模式,不支援廣播消費模式

無序訊息的重試

對於無序訊息(普通、定時、延時、事務訊息),當消費者消費訊息失敗時,您可以通過設定返回狀態達到訊息重試的結果。

無序訊息的重試只針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息

消費時候後,重試的配置方式有如下三種:

  • 返回Action.ReconsumeLater(推薦)
  • 返回Null
  • 丟擲異常
public class MessageListenerImpl implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        //訊息處理邏輯丟擲異常,訊息將重試。
        doConsumeMessage(message);
        //方式1:返回Action.ReconsumeLater,訊息將重試。
        return Action.ReconsumeLater;
        //方式2:返回null,訊息將重試。
        return null;
        //方式3:直接丟擲異常,訊息將重試。
        throw new RuntimeException("Consumer Message exception");
    }
}

消費失敗後,無需重試的配置方式

叢集消費方式下,訊息失敗後期望訊息不重試,需要捕獲消費邏輯中可能丟擲的異常,最終返回Action.CommitMessage,此後這條訊息將不會再重試。

public class MessageListenerImpl implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            //捕獲消費邏輯中的所有異常,並返回Action.CommitMessage;
            return Action.CommitMessage;
        }
        //訊息處理正常,直接返回Action.CommitMessage;
        return Action.CommitMessage;
    }
}

訊息重試次數

RocketMQ預設允許每條訊息最多重試16次,每次消費失敗傳送一條延時訊息到重試佇列,同一條訊息失敗一次將延時等級提高一次,然後再放到重試佇列。重試16次後如果還沒有消費成功,則將訊息放到死信佇列中。

注意:重試佇列和死信佇列都是按照Consumer Group劃分的

  • 重試佇列topic名字:%RETRY% + consumerGroup
  • 死信佇列topic名字:%DLQ% + consumerGroup

為什麼重試佇列和死信佇列要按照Consumer Group來進行劃分?

  • 因為在RocketMQ的時候使用一定要保持訂閱關係一致。即一個Consumer Group訂閱的topic和tag要完全一致,不然可能會導致消費邏輯混亂,訊息丟失。

如下任意一種情況都表現為訂閱關係不一致:

  • 相同ConsumerGroup下的Consumer例項訂閱了不同的Topic。
  • 相同ConsumerGroup下的Consumer例項訂閱了相同的Topic,但訂閱的Tag不一致。

我們可以通過控制檯檢視各種型別的主題:

訊息每次重試的間隔時間如下:

第幾次重試

與上次重試的間隔時間

第幾次重試

與上次重試的間隔時間

1

10 秒

9

7 分鐘

2

30 秒

10

8 分鐘

3

1 分鐘

11

9 分鐘

4

2 分鐘

12

10 分鐘

5

3 分鐘

13

20 分鐘

6

4 分鐘

14

30 分鐘

7

5 分鐘

15

1 小時

8

6 分鐘

16

2 小時

前面說到RocketMQ的訊息重試是通過往重試佇列傳送定時訊息來實現的。

RocketMQ支援18個級別的定時延時,每個級別定時訊息的延時時間如下:

// MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

訊息重試只是把定時訊息的前2個級別去掉,每次傳送下一個級別的定時訊息。

我們可以設定消費端訊息重試次數:

  • 最大重試次數小於等於16次,則重試時間間隔同上表描述。
  • 最大重試次數大於16次,超過16次的重試時間間隔均為每次2小時。
Properties properties = new Properties();
// 配置對應Group ID的最大訊息重試次數為20次,最大重試次數為字串型別。
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

那麼重試佇列中的訊息是如何被消費的?

訊息消費者在啟動的時候,會訂閱正常的topic和重試佇列的topic

定時訊息的實現邏輯也比較簡單,可以歸納為如下幾步:

  1. 傳送延時訊息
    • 1.1 替換topic為SCHEDULE_TOPIC_XXXX,queueId為訊息延遲等級(如果不替換topic直接發到對應的consumeQueue中,則訊息會被立馬消費)
    • 1.2 將訊息原來的topic,queueId放到訊息擴充套件屬性中
    • 1.3 將訊息應該執行的時間放到tagsCode中
  2. 將訊息順序寫到CommitLog中
  3. 將訊息對應的資訊分發到對應的ConsumerQueue中(topic為SCHEDULE_TOPIC_XXXX總共有18個queue,對應18個延遲級別)
  4. 定時任務不斷判斷訊息是否到達投遞時間,沒有到達則後續執行投遞
  5. 如果到達投遞時間,則從commitLog中拉取訊息的內容,重新設定訊息topic,queueId為原來的(原來的topic,queueId在訊息擴充套件屬性中),然後將訊息投遞到commitLog中,此時訊息就會被分發到對應的佇列中,然後被消費

吃水不忘挖井人: