1. 程式人生 > 程式設計 >RabbitMQ 之訊息的可靠性投遞

RabbitMQ 之訊息的可靠性投遞

預設情況下,訊息傳送端傳送訊息給 RabbitMQ 後,RabbitMQ 是不會返回任何資訊的。 那麼我們怎麼知道訊息是中途丟失了還是到達了 broker 呢?

RabbitMQ 提供了兩種確認訊息是否投遞成功的方法

  • 設定 channel 為 transaction 模式,通過 AMQP 事務機制實現,這也是 AMQP 協議層面提供的解決方案
  • 設定 channel 為 confirm 模式,這是 RabbitMQ 提供的解決方案

兩種模式不能共存

channel 的 transaction 模式

RabbitMQ 中與事務機制有關的方法有三個

  • txSelect:用於將當前 channel 設定成 transaction 模式
  • txCommit:用於提交事務
  • txRollback:用於回滾事務

在通過 txSelect 開啟事務之後,我們便可以釋出訊息給broker代理伺服器了,如果txCommit提交成功了,則訊息一定到達了broker了,如果在 txCommit 執行之前 broker 異常崩潰或者由於其他原因丟擲異常,這個時候我們便可以捕獲異常通過 txRollback 回滾事務了

事務確實能夠解決 producer 與 broker 之間訊息確認的問題,只有訊息成功被 broker 接收,事務提交才能成功,否則我們便可以在捕獲異常進行事務回滾操作,同時進行訊息重發,但是使用事務機制的話會降低RabbitMQ的效能。

RabbitMQ 提供了一個更好的方案,使用 channel 通道的 confirm 模式。

channel 的 confirm 模式

生產者通過呼叫 channel 的 confirmSelect 方法將 channel 設定為 confirm 模式. 該模式下,所有在該通道上釋出的訊息都會被分派一個唯一的ID(從1開始),當訊息被投遞到所有匹配的佇列後,broker 就會傳送一個(包含訊息的唯一 ID 的)確認給傳送端, 如果 RabbitMQ 因為自身內部錯誤導致訊息丟失,就會傳送一條nack訊息,傳送端的 Confirm Listener 會去監聽應答

broker回傳給傳送端的確認訊息中 deliver-tag 域包含了確認訊息的ID,此外 broker 也可以設定 basic.ack 的 multiple 域,表示到這個ID之前的所有訊息都已經得到了處理

confirm模式最大的好處在於他是非同步的,生產者可以在等通道返回的同時繼續傳送下一條訊息。

延伸

上面說到“當訊息被投遞到所有匹配的佇列後,broker 就會傳送一個(包含訊息的唯一 ID 的)確認給傳送端”,萬一傳送確認後, rabbitMq 崩潰了,訊息佇列中的訊息就都沒了,這時候傳送端還以為訊息還在佇列中。

為了防止這種情況的傳送,rabbitMq 需要對佇列和訊息進行持久化。

當訊息和佇列開啟持久化之後,確認資訊會等到訊息寫入磁碟之後再發出

Confirm 的三種使用方式

  • 普通確認:每傳送一條訊息後,呼叫channel.waitForConfirms()方法,同步等待伺服器端confirm。實際上是一種序列confirm了。
  • 批量確認:每傳送一批訊息後,呼叫channel.waitForConfirms()方法,同步等待伺服器端confirm
  • 非同步確認:為channel新增一個監聽器,rabbitmq 會回撥這個方法,示例程式碼如下
// 新增一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
    //訊息失敗處理
    @Override
    public void handleNack(long deliveryTag,boolean multiple) throws IOException {
        //deliveryTag;唯一訊息標籤
        //multiple:是否批量
        System.err.println("-------no ack!-----------");
    }
    //訊息成功處理
    @Override
    public void handleAck(long deliveryTag,boolean multiple) throws IOException {
        System.err.println("-------ack!-----------");
    }
});
複製程式碼

實際開發中很少會直接使用 RabbitMQ 包,更多時候是使用 spring 提供的 spring-rabbit ,它封裝了 RabbitMQ 並將它整合到了 spring 框架中。

這裡給出 springBoot 版的非同步確認的詳細實現,供大家參考。

RabbitMQ confirm 模式之非同步確認

批量 confirm 模式 和 非同步confirm模式效率更高些,但是批量confirm模式下,如果 rabbitMq 返回失敗,那麼需要重新傳送這一批訊息,建議最好還是選擇非同步confirm模式

超時

RabbitMQ 的響應可能會超時,超時可能是訊息沒有到達 mq ,也有可能是網路延遲導致的。對於響應超時的訊息,通常被認定為投遞失敗。

對於投遞失敗的訊息,需要進行訊息補償

訊息補償

訊息補償由傳送端自己設計,常見的設計方案為

資料落庫,訊息狀態打標

如圖

  • step1: 傳送業務訊息前,先將業務資料和訊息狀態入庫,並將訊息狀態初始化為傳送中
  • step2: 傳送業務訊息,設定超時時間,同時非同步監聽 RabbitMQ 響應
  • step3: RabbitMQ 返回響應
  • step4: 根據響應結果,更新訊息狀態,投遞成功則將訊息狀態設定成成功
  • step5: 定時任務找出狀態為傳送中,且時間超時的
  • stem6: 重新投遞
  • stem7: 經過上述步驟多次(通常是3次)迴圈後,依然失敗的,設定訊息狀態為失敗
  • stem8: 人工去解決狀態為失敗的訊息

重複投遞

訊息補償機制可能會導致重複投遞,重複投遞可能導致消費端重複消費。但重複投遞又無法完全避免,因此消費端需要防止重複消費

總結

要保證 RabbitMQ 的訊息可靠性投遞,需要做到以下幾點

  • 訊息傳送端開啟 channel 的 confirm 模式
  • 訊息傳送端非同步接收 RabbitMQ 響應
  • RabbitMQ 對佇列和訊息進行持久化
  • 訊息傳送端建立訊息投遞失敗的補償機制

相關文章

RabbitMQ 之訊息的可靠性消費

RabbitMQ 持久化