面試官再問我如何保證 RocketMQ 不丟失訊息,這回我笑了!
最近看了 @JavaGuide 釋出的一篇『面試官問我如何保證Kafka不丟失訊息?我哭了!』,這篇文章承接這個主題,來聊聊如何保證 RocketMQ 不丟失訊息。
0x00. 訊息的傳送流程
一條訊息從生產到被消費,將會經歷三個階段:
- 生產階段,Producer 新建訊息,然後通過網路將訊息投遞給 MQ Broker
- 儲存階段,訊息將會儲存在 Broker 端磁碟中
- 訊息階段, Consumer 將會從 Broker 拉取訊息
以上任一階段都可能會丟失訊息,我們只要找到這三個階段丟失訊息原因,採用合理的辦法避免丟失,就可以徹底解決訊息丟失的問題。
0x01. 生產階段
生產者(Producer) 通過網路傳送訊息給 Broker,當 Broker 收到之後,將會返回確認響應資訊給 Producer。所以生產者只要接收到返回的確認響應,就代表訊息在生產階段未丟失。
RocketMQ 傳送訊息示例程式碼如下:
DefaultMQProducer mqProducer=new DefaultMQProducer("test"); // 設定 nameSpace 地址 mqProducer.setNamesrvAddr("namesrvAddr"); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 傳送訊息到一個Broker try { SendResult sendResult = mqProducer.send(msg); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
send
方法是一個同步操作,只要這個方法不丟擲任何異常,就代表訊息已經發送成功。
訊息傳送成功僅代表訊息已經到了 Broker 端,Broker 在不同配置下,可能會返回不同響應狀態:
SendStatus.SEND_OK
SendStatus.FLUSH_DISK_TIMEOUT
SendStatus.FLUSH_SLAVE_TIMEOUT
SendStatus.SLAVE_NOT_AVAILABLE
引用官方狀態說明:
上圖中不同 broker 端配置將會在下文詳細解釋
另外 RocketMQ 還提供非同步的傳送的方式,適合於鏈路耗時較長,對響應時間較為敏感的業務場景。
DefaultMQProducer mqProducer = new DefaultMQProducer("test"); // 設定 nameSpace 地址 mqProducer.setNamesrvAddr("127.0.0.1:9876"); mqProducer.setRetryTimesWhenSendFailed(5); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); try { // 非同步傳送訊息到,主執行緒不會被阻塞,立刻會返回 mqProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 訊息傳送成功, } @Override public void onException(Throwable e) { // 訊息傳送失敗,可以持久化這條資料,後續進行補償處理 } }); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
非同步傳送訊息一定要注意重寫回調方法,在回撥方法中檢查傳送結果。
不管是同步還是非同步的方式,都會碰到網路問題導致傳送失敗的情況。針對這種情況,我們可以設定合理的重試次數,當出現網路問題,可以自動重試。設定方式如下:
// 同步傳送訊息重試次數,預設為 2
mqProducer.setRetryTimesWhenSendFailed(3);
// 非同步傳送訊息重試次數,預設為 2
mqProducer.setRetryTimesWhenSendAsyncFailed(3);
0x02. Broker 儲存階段
預設情況下,訊息只要到了 Broker 端,將會優先儲存到記憶體中,然後立刻返回確認響應給生產者。隨後 Broker 定期批量的將一組訊息從記憶體非同步刷入磁碟。
這種方式減少 I/O 次數,可以取得更好的效能,但是如果發生機器掉電,異常宕機等情況,訊息還未及時刷入磁碟,就會出現丟失訊息的情況。
若想保證 Broker 端不丟訊息,保證訊息的可靠性,我們需要將訊息儲存機制修改為同步刷盤方式,即訊息儲存磁碟成功,才會返回響應。
修改 Broker 端配置如下:
## 預設情況為 ASYNC_FLUSH
flushDiskType = SYNC_FLUSH
若 Broker 未在同步刷盤時間內(預設為 5s)完成刷盤,將會返回 SendStatus.FLUSH_DISK_TIMEOUT
狀態給生產者。
叢集部署
為了保證可用性,Broker 通常採用一主(master)多從(slave)部署方式。為了保證訊息不丟失,訊息還需要複製到 slave 節點。
預設方式下,訊息寫入 master 成功,就可以返回確認響應給生產者,接著訊息將會非同步複製到 slave 節點。
注:master 配置:flushDiskType = SYNC_FLUSH
此時若 master 突然宕機且不可恢復,那麼還未複製到 slave 的訊息將會丟失。
為了進一步提高訊息的可靠性,我們可以採用同步的複製方式,master 節點將會同步等待 slave 節點複製完成,才會返回確認響應。
非同步複製與同步複製區別如下圖:
注: 大家不要被上圖誤導,broker master 只能配置一種複製方式,上圖只為解釋同步複製的與非同步複製的概念。
Broker master 節點 同步複製配置如下:
## 預設為 ASYNC_MASTER
brokerRole=SYNC_MASTER
如果 slave 節點未在指定時間內同步返回響應,生產者將會收到 SendStatus.FLUSH_SLAVE_TIMEOUT
返回狀態。
小結
結合生產階段與儲存階段,若需要嚴格保證訊息不丟失,broker 需要採用如下配置:
## master 節點配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER
## slave 節點配置
brokerRole=slave
flushDiskType = SYNC_FLUSH
同時這個過程我們還需要生產者配合,判斷返回狀態是否是 SendStatus.SEND_OK
。若是其他狀態,就需要考慮補償重試。
雖然上述配置提高訊息的高可靠性,但是會降低效能,生產實踐中需要綜合選擇。
0x03. 消費階段
消費者從 broker 拉取訊息,然後執行相應的業務邏輯。一旦執行成功,將會返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
狀態給 Broker。
如果 Broker 未收到消費確認響應或收到其他狀態,消費者下次還會再次拉取到該條訊息,進行重試。這樣的方式有效避免了消費者消費過程發生異常,或者訊息在網路傳輸中丟失的情況。
訊息消費的程式碼如下:
// 例項化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
// 設定NameServer的地址
consumer.setNamesrvAddr("namesrvAddr");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息
consumer.subscribe("test_topic", "*");
// 註冊回撥實現類來處理從broker拉取回來的訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 執行業務邏輯
// 標記該訊息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者例項
consumer.start();
以上消費訊息過程的,我們需要注意返回訊息狀態。只有當業務邏輯真正執行成功,我們才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
。否則我們需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
,稍後再重試。
0x04. 總結
看完 RocketMQ 不丟訊息處理辦法,回頭再看這篇 kafka,有沒有發現,兩者解決思路是一樣的,區別就是引數配置不一樣而已。
所以下一次,面試官再問你 XX 訊息佇列如何保證不丟訊息?如果你沒用過這個訊息佇列,也不要哭,微笑面對他,從容給他分析那幾步會丟失,然後大致解決思路。
最後我們還可以說出我們的思考,雖然提高訊息可靠性,但是可能導致訊息重發,重複消費。所以對於消費客戶端,需要注意保證冪等性。
但是要注意了,這時面試官可能就會跟你的話題,讓你來聊聊如何保證冪等性,一定先想好再說哦。
什麼?你還不知道如何實現冪等?那就趕緊關注@程式通事,後面文章我們就來聊聊冪等這個話題。
0x05. Reference
- 極客時間-訊息佇列高手課
- https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
最後說一句(求關注)
才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。
再次感謝您的閱讀,我是樓下小黑哥,一位還未禿頭的工具猿,下篇文章我們再見~
歡迎關注我的公眾號:程式通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:studyidea.cn