RocketMQ (三)RocketMQ 怎麼保證的訊息不丟失?
一、訊息傳送過程
我們將訊息流程分為如下三大部分,每一部分都有可能會丟失資料。
-
生產階段:Producer通過網路將訊息傳送給Broker,這個傳送可能會發生丟失,比如網路延遲不可達等。
-
儲存階段:Broker肯定是先把訊息放到記憶體的,然後根據刷盤策略持久化到硬碟中,剛收到Producer的訊息,再記憶體中了,但是異常宕機了,導致訊息丟失。
-
消費階段:消費失敗了其實也是訊息丟失的一種變體吧。
二、Producer生產階段
Producer通過網路將訊息傳送給Broker,這個傳送可能會發生丟失,比如網路延遲不可達等。
1、解決方案一
1.1、說明
有三種send方法,同步傳送、非同步傳送、單向傳送。我們可以採取同步傳送的方式進行傳送訊息,發訊息的時候會同步阻塞等待broker返回的結果,如果沒成功,則不會收到SendResult,這種是最可靠的。其次是非同步傳送,再回調方法裡可以得知是否傳送成功。單向傳送(OneWay)是最不靠譜的一種傳送方式,我們無法保證訊息真正可達。
1.2、原始碼
/** * {@link org.apache.rocketmq.client.producer.DefaultMQProducer} */ // 同步傳送 public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {} // 非同步傳送,sendCallback作為回撥 public void send(Message msg,SendCallback sendCallback) throwsMQClientException, RemotingException, InterruptedException {} // 單向傳送,不關心傳送結果,最不靠譜 public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {}
2、解決方案二
2.1、說明
傳送訊息如果失敗或者超時了,則會自動重試。預設是重試三次,可以根據api進行更改,比如改為10次:
producer.setRetryTimesWhenSendFailed(10);
2.2、原始碼
/** * {@link org.apache.rocketmq.client.producer.DefaultMQProducer#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)} */ // 自動重試次數,this.defaultMQProducer.getRetryTimesWhenSendFailed()預設為2,如果是同步傳送,預設重試3次,否則重試1次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; for (; times < timesTotal; times++) { // 選擇傳送的訊息queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { try { // 真正的傳送邏輯,sendKernelImpl。 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 如果傳送失敗了,則continue,意味著還會再次進入for,繼續重試傳送 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } // 傳送成功的話,將傳送結果返回給呼叫者 return sendResult; default: break; } } catch (RemotingException e) { continue; } catch (...) { continue; } } }
說明:
這裡只是總結出核心的傳送邏輯,並不是全程式碼。可以看出如下:
重試次數同步是1 +
this.defaultMQProducer.getRetryTimesWhenSendFailed()
,其他方式預設1次。this.defaultMQProducer.getRetryTimesWhenSendFailed()預設是2,我們可以手動設定
producer.setRetryTimesWhenSendFailed(10);
呼叫sendKernelImpl真正的去傳送訊息
如果是sync同步傳送,且傳送失敗了,則continue,意味著還會再次進入for,繼續重試傳送
傳送成功的話,將傳送結果返回給呼叫者
如果傳送異常進入catch了,則continue繼續下次重試。
3、解決方案三
3.1、說明
假設Broker宕機了,但是生產環境一般都是多M多S的,所以還會有其他master節點繼續提供服務,這也不會影響到我們傳送訊息,我們訊息依然可達。因為比如恰巧傳送到broker的時候,broker宕機了,producer收到broker的響應傳送失敗了,這時候producer會自動重試,這時候宕機的broker就被踢下線了, 所以producer會換一臺broker傳送訊息。
4、總結
Producer怎麼保證傳送階段訊息可達?
失敗會自動重試,即使重試N次也不行後,那客戶端也會知道訊息沒成功,這也可以自己補償等,不會盲目影響到主業務邏輯。再比如即使Broker掛了,那還有其他Broker再提供服務了,高可用,不影響。
總結為幾個字就是:同步傳送+自動重試機制+多個Master節點
三、Broker儲存階段
Broker肯定是先把訊息放到記憶體的,然後根據刷盤策略持久化到硬碟中,剛收到Producer的訊息,再記憶體中了,但是異常宕機了,導致訊息丟失。
1、解決方案一
MQ持久化訊息分為兩種:同步刷盤和非同步刷盤。預設情況是非同步刷盤,Broker收到訊息後會先存到cache裡然後立馬通知Producer說訊息我收到且儲存成功了,你可以繼續你的業務邏輯了,然後Broker起個執行緒非同步的去持久化到磁碟中,但是Broker還沒持久化到磁碟就宕機的話,訊息就丟失了。同步刷盤的話是收到訊息存到cache後並不會通知Producer說訊息已經ok了,而是會等到持久化到磁碟中後才會通知Producer說訊息完事了。這也保障了訊息不會丟失,但是效能不如非同步高。看業務場景取捨。
修改刷盤策略為同步刷盤。預設情況下是非同步刷盤的,如下配置
## 預設情況為 ASYNC_FLUSH,修改為同步刷盤:SYNC_FLUSH,實際場景看業務,同步刷盤效率肯定不如非同步刷盤高。
flushDiskType = SYNC_FLUSH
對應的Java配置類如下:
package org.apache.rocketmq.store.config; public enum FlushDiskType { // 同步刷盤 SYNC_FLUSH, // 非同步刷盤(預設) ASYNC_FLUSH }
非同步刷盤預設10s執行一次,原始碼如下:
/* * {@link org.apache.rocketmq.store.CommitLog#run()} */ while (!this.isStopped()) { try { // 等待10s this.waitForRunning(10); // 刷盤 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } }
2、解決方案二
叢集部署,主從模式,高可用。
即使Broker設定了同步刷盤策略,但是Broker刷完盤後磁碟壞了,這會導致盤上的訊息全TM丟了。但是如果即使是1主1從了,但是Master刷完盤後還沒來得及同步給Slave就磁碟壞了,不也是GG嗎?沒錯!
所以我們還可以配置不僅是等Master刷完盤就通知Producer,而是等Master和Slave都刷完盤後才去通知Producer說訊息ok了。
## 預設為 ASYNC_MASTER
brokerRole=SYNC_MASTER
3、總結
若想很嚴格的保證Broker儲存訊息階段訊息不丟失,則需要如下配置,但是效能肯定遠差於預設配置。
# master 節點配置 flushDiskType = SYNC_FLUSH brokerRole=SYNC_MASTER # slave 節點配置 brokerRole=slave flushDiskType = SYNC_FLUSH
上面這個配置含義是:
Producer發訊息到Broker後,Broker的Master節點先持久化到磁碟中,然後同步資料給Slave節點,Slave節點同步完且落盤完成後才會返回給Producer說訊息ok了。
四、Consumer消費階段
消費失敗了其實也是訊息丟失的一種變體。
1、解決方案一
消費者會先把訊息拉取到本地,然後進行業務邏輯,業務邏輯完成後手動進行ack確認,這時候才會真正的代表消費完成。而不是說pull到本地後訊息就算消費完了。舉個例子
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { String str = new String(msg.getBody()); System.out.println(str); } // ack,只有等上面一系列邏輯都處理完後,到這步CONSUME_SUCCESS才會通知broker說訊息消費完成,如果上面發生異常沒有走到這步ack,則訊息還是未消費狀態。而不是像比如redis的blpop,彈出一個數據後資料就從redis裡消失了,並沒有等我們業務邏輯執行完才彈出。 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
2、解決方案二
訊息消費失敗自動重試。如果消費訊息失敗了,沒有進行ack確認,則會自動重試,重試策略和次數(預設15次)如下配置
/** * Broker可以配置的所有選項 */ public class org.apache.rocketmq.store.config.MessageStoreConfig { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; }