1. 程式人生 > 實用技巧 >RocketMQ (三)RocketMQ 怎麼保證的訊息不丟失?

RocketMQ (三)RocketMQ 怎麼保證的訊息不丟失?

一、訊息傳送過程

我們將訊息流程分為如下三大部分,每一部分都有可能會丟失資料。

  • 生產階段:Producer通過網路將訊息傳送給Broker,這個傳送可能會發生丟失,比如網路延遲不可達等。

  • 儲存階段:Broker肯定是先把訊息放到記憶體的,然後根據刷盤策略持久化到硬碟中,剛收到Producer的訊息,再記憶體中了,但是異常宕機了,導致訊息丟失。

  • 消費階段:消費失敗了其實也是訊息丟失的一種變體吧。

二、Producer生產階段

Producer通過網路將訊息傳送給Broker,這個傳送可能會發生丟失,比如網路延遲不可達等。

1、解決方案一

1.1、說明

有三種send方法,同步傳送、非同步傳送、單向傳送。我們可以採取同步傳送的方式進行傳送訊息,發訊息的時候會同步阻塞等待broker返回的結果,如果沒成功,則不會收到SendResult,這種是最可靠的。其次是非同步傳送,再回調方法裡可以得知是否傳送成功。單向傳送(OneWay)是最不靠譜的一種傳送方式,我們無法保證訊息真正可達。

1.2、原始碼

/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer}
 */

// 同步傳送
public SendResult send(Message msg) throws MQClientException, RemotingException,      MQBrokerException, InterruptedException {}

// 非同步傳送,sendCallback作為回撥
public void send(Message msg,SendCallback sendCallback) throws
MQClientException, RemotingException, InterruptedException {} // 單向傳送,不關心傳送結果,最不靠譜 public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {}

2、解決方案二

2.1、說明

傳送訊息如果失敗或者超時了,則會自動重試。預設是重試三次,可以根據api進行更改,比如改為10次:

producer.setRetryTimesWhenSendFailed(10);

2.2、原始碼

/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)}
 */

// 自動重試次數,this.defaultMQProducer.getRetryTimesWhenSendFailed()預設為2,如果是同步傳送,預設重試3次,否則重試1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
for (; times < timesTotal; times++) {
      // 選擇傳送的訊息queue
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        try {
            // 真正的傳送邏輯,sendKernelImpl。
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            switch (communicationMode) {
                case ASYNC:
                    return null;
                case ONEWAY:
                    return null;
                case SYNC:
                    // 如果傳送失敗了,則continue,意味著還會再次進入for,繼續重試傳送
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                            continue;
                        }
                    }
                    // 傳送成功的話,將傳送結果返回給呼叫者
                    return sendResult;
                default:
                    break;
            }
        } catch (RemotingException e) {
            continue;
        } catch (...) {
            continue;
        }
    }
}

說明:

這裡只是總結出核心的傳送邏輯,並不是全程式碼。可以看出如下:

  • 重試次數同步是1 +this.defaultMQProducer.getRetryTimesWhenSendFailed(),其他方式預設1次。

  • this.defaultMQProducer.getRetryTimesWhenSendFailed()預設是2,我們可以手動設定producer.setRetryTimesWhenSendFailed(10);

  • 呼叫sendKernelImpl真正的去傳送訊息

  • 如果是sync同步傳送,且傳送失敗了,則continue,意味著還會再次進入for,繼續重試傳送

  • 傳送成功的話,將傳送結果返回給呼叫者

  • 如果傳送異常進入catch了,則continue繼續下次重試。

3、解決方案三

3.1、說明

假設Broker宕機了,但是生產環境一般都是多M多S的,所以還會有其他master節點繼續提供服務,這也不會影響到我們傳送訊息,我們訊息依然可達。因為比如恰巧傳送到broker的時候,broker宕機了,producer收到broker的響應傳送失敗了,這時候producer會自動重試,這時候宕機的broker就被踢下線了, 所以producer會換一臺broker傳送訊息。

4、總結

Producer怎麼保證傳送階段訊息可達?

失敗會自動重試,即使重試N次也不行後,那客戶端也會知道訊息沒成功,這也可以自己補償等,不會盲目影響到主業務邏輯。再比如即使Broker掛了,那還有其他Broker再提供服務了,高可用,不影響。

總結為幾個字就是:同步傳送+自動重試機制+多個Master節點

三、Broker儲存階段

Broker肯定是先把訊息放到記憶體的,然後根據刷盤策略持久化到硬碟中,剛收到Producer的訊息,再記憶體中了,但是異常宕機了,導致訊息丟失。

1、解決方案一

MQ持久化訊息分為兩種:同步刷盤和非同步刷盤。預設情況是非同步刷盤,Broker收到訊息後會先存到cache裡然後立馬通知Producer說訊息我收到且儲存成功了,你可以繼續你的業務邏輯了,然後Broker起個執行緒非同步的去持久化到磁碟中,但是Broker還沒持久化到磁碟就宕機的話,訊息就丟失了。同步刷盤的話是收到訊息存到cache後並不會通知Producer說訊息已經ok了,而是會等到持久化到磁碟中後才會通知Producer說訊息完事了。這也保障了訊息不會丟失,但是效能不如非同步高。看業務場景取捨。

修改刷盤策略為同步刷盤。預設情況下是非同步刷盤的,如下配置

## 預設情況為 ASYNC_FLUSH,修改為同步刷盤:SYNC_FLUSH,實際場景看業務,同步刷盤效率肯定不如非同步刷盤高。
flushDiskType = SYNC_FLUSH 

對應的Java配置類如下:

package org.apache.rocketmq.store.config;

public enum FlushDiskType {
    // 同步刷盤
    SYNC_FLUSH,
    // 非同步刷盤(預設)
    ASYNC_FLUSH
}

非同步刷盤預設10s執行一次,原始碼如下:

/*
 * {@link org.apache.rocketmq.store.CommitLog#run()}
 */

while (!this.isStopped()) {
    try {
        // 等待10s
        this.waitForRunning(10);
        // 刷盤
        this.doCommit();
    } catch (Exception e) {
        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

2、解決方案二

叢集部署,主從模式,高可用。

即使Broker設定了同步刷盤策略,但是Broker刷完盤後磁碟壞了,這會導致盤上的訊息全TM丟了。但是如果即使是1主1從了,但是Master刷完盤後還沒來得及同步給Slave就磁碟壞了,不也是GG嗎?沒錯!

所以我們還可以配置不僅是等Master刷完盤就通知Producer,而是等Master和Slave都刷完盤後才去通知Producer說訊息ok了。

## 預設為 ASYNC_MASTER
brokerRole=SYNC_MASTER

3、總結

若想很嚴格的保證Broker儲存訊息階段訊息不丟失,則需要如下配置,但是效能肯定遠差於預設配置。

# master 節點配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER

# slave 節點配置
brokerRole=slave
flushDiskType = SYNC_FLUSH

上面這個配置含義是:

Producer發訊息到Broker後,Broker的Master節點先持久化到磁碟中,然後同步資料給Slave節點,Slave節點同步完且落盤完成後才會返回給Producer說訊息ok了。

四、Consumer消費階段

消費失敗了其實也是訊息丟失的一種變體。

1、解決方案一

消費者會先把訊息拉取到本地,然後進行業務邏輯,業務邏輯完成後手動進行ack確認,這時候才會真正的代表消費完成。而不是說pull到本地後訊息就算消費完了。舉個例子

 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         for (MessageExt msg : msgs) {
             String str = new String(msg.getBody());
             System.out.println(str);
         }
         // ack,只有等上面一系列邏輯都處理完後,到這步CONSUME_SUCCESS才會通知broker說訊息消費完成,如果上面發生異常沒有走到這步ack,則訊息還是未消費狀態。而不是像比如redis的blpop,彈出一個數據後資料就從redis裡消失了,並沒有等我們業務邏輯執行完才彈出。
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });

2、解決方案二

訊息消費失敗自動重試。如果消費訊息失敗了,沒有進行ack確認,則會自動重試,重試策略和次數(預設15次)如下配置

/**
 * Broker可以配置的所有選項
 */
public class org.apache.rocketmq.store.config.MessageStoreConfig {
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}