RabbitMQ 之訊息的可靠性投遞
預設情況下,訊息傳送端傳送訊息給 RabbitMQ 後,RabbitMQ 是不會返回任何資訊的。 那麼我們怎麼知道訊息是中途丟失了還是到達了 broker 呢?
RabbitMQ 提供了兩種確認訊息是否投遞成功的方法
- 設定 channel 為 transaction 模式,通過 AMQP 事務機制實現,這也是 AMQP 協議層面提供的解決方案
- 設定 channel 為 confirm 模式,這是 RabbitMQ 提供的解決方案
兩種模式不能共存
channel 的 transaction 模式
RabbitMQ 中與事務機制有關的方法有三個
- txSelect:用於將當前 channel 設定成 transaction 模式
- txCommit:用於提交事務
- txRollback:用於回滾事務
在通過 txSelect 開啟事務之後,我們便可以釋出訊息給broker代理伺服器了,如果txCommit提交成功了,則訊息一定到達了broker了,如果在 txCommit 執行之前 broker 異常崩潰或者由於其他原因丟擲異常,這個時候我們便可以捕獲異常通過 txRollback 回滾事務了
事務確實能夠解決 producer 與 broker 之間訊息確認的問題,只有訊息成功被 broker 接收,事務提交才能成功,否則我們便可以在捕獲異常進行事務回滾操作,同時進行訊息重發,但是使用事務機制的話會降低RabbitMQ的效能。
RabbitMQ 提供了一個更好的方案,使用 channel 通道的 confirm 模式。
channel 的 confirm 模式
生產者通過呼叫 channel 的 confirmSelect 方法將 channel 設定為 confirm 模式. 該模式下,所有在該通道上釋出的訊息都會被分派一個唯一的ID(從1開始),當訊息被投遞到所有匹配的佇列後,broker 就會傳送一個(包含訊息的唯一 ID 的)確認給傳送端, 如果 RabbitMQ 因為自身內部錯誤導致訊息丟失,就會傳送一條nack訊息,傳送端的 Confirm Listener 會去監聽應答
broker回傳給傳送端的確認訊息中 deliver-tag 域包含了確認訊息的ID,此外 broker 也可以設定 basic.ack 的 multiple 域,表示到這個ID之前的所有訊息都已經得到了處理
confirm模式最大的好處在於他是非同步的,生產者可以在等通道返回的同時繼續傳送下一條訊息。
延伸
上面說到“當訊息被投遞到所有匹配的佇列後,broker 就會傳送一個(包含訊息的唯一 ID 的)確認給傳送端”,萬一傳送確認後, rabbitMq 崩潰了,訊息佇列中的訊息就都沒了,這時候傳送端還以為訊息還在佇列中。
為了防止這種情況的傳送,rabbitMq 需要對佇列和訊息進行持久化。
當訊息和佇列開啟持久化之後,確認資訊會等到訊息寫入磁碟之後再發出
Confirm 的三種使用方式
- 普通確認:每傳送一條訊息後,呼叫channel.waitForConfirms()方法,同步等待伺服器端confirm。實際上是一種序列confirm了。
- 批量確認:每傳送一批訊息後,呼叫channel.waitForConfirms()方法,同步等待伺服器端confirm
- 非同步確認:為channel新增一個監聽器,rabbitmq 會回撥這個方法,示例程式碼如下
// 新增一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
//訊息失敗處理
@Override
public void handleNack(long deliveryTag,boolean multiple) throws IOException {
//deliveryTag;唯一訊息標籤
//multiple:是否批量
System.err.println("-------no ack!-----------");
}
//訊息成功處理
@Override
public void handleAck(long deliveryTag,boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
複製程式碼
實際開發中很少會直接使用 RabbitMQ 包,更多時候是使用 spring 提供的 spring-rabbit ,它封裝了 RabbitMQ 並將它整合到了 spring 框架中。
這裡給出 springBoot 版的非同步確認的詳細實現,供大家參考。
批量 confirm 模式 和 非同步confirm模式效率更高些,但是批量confirm模式下,如果 rabbitMq 返回失敗,那麼需要重新傳送這一批訊息,建議最好還是選擇非同步confirm模式
超時
RabbitMQ 的響應可能會超時,超時可能是訊息沒有到達 mq ,也有可能是網路延遲導致的。對於響應超時的訊息,通常被認定為投遞失敗。
對於投遞失敗的訊息,需要進行訊息補償
訊息補償
訊息補償由傳送端自己設計,常見的設計方案為
資料落庫,訊息狀態打標
如圖
- step1: 傳送業務訊息前,先將業務資料和訊息狀態入庫,並將訊息狀態初始化為傳送中
- step2: 傳送業務訊息,設定超時時間,同時非同步監聽 RabbitMQ 響應
- step3: RabbitMQ 返回響應
- step4: 根據響應結果,更新訊息狀態,投遞成功則將訊息狀態設定成成功
- step5: 定時任務找出狀態為傳送中,且時間超時的
- stem6: 重新投遞
- stem7: 經過上述步驟多次(通常是3次)迴圈後,依然失敗的,設定訊息狀態為失敗
- stem8: 人工去解決狀態為失敗的訊息
重複投遞
訊息補償機制可能會導致重複投遞,重複投遞可能導致消費端重複消費。但重複投遞又無法完全避免,因此消費端需要防止重複消費。
總結
要保證 RabbitMQ 的訊息可靠性投遞,需要做到以下幾點
- 訊息傳送端開啟 channel 的 confirm 模式
- 訊息傳送端非同步接收 RabbitMQ 響應
- RabbitMQ 對佇列和訊息進行持久化
- 訊息傳送端建立訊息投遞失敗的補償機制