1. 程式人生 > 實用技巧 >RabbitMQ (十二) 訊息確認機制 - 釋出者確認

RabbitMQ (十二) 訊息確認機制 - 釋出者確認

消費者確認解決的問題是確認訊息是否被消費者"成功消費".

它有個前提條件,那就是生產者釋出的訊息已經"成功"傳送出去了.

因此還需要一個機制來告訴生產者,你傳送的訊息真的"成功"傳送了.

在標準的AMQP 0-9-1,保證訊息不會丟失的唯一方法是使用事務:在通道上開啟事務,釋出訊息,提交事務.但是事務是非常重量級的,它使得RabbitMQ的吞吐量降低250倍.為了解決這個問題,RabbitMQ 引入了 釋出者確認(Publisher Confirms) 機制,它是模仿AMQP協議中的消費者訊息確認機制.

事務機制

生產者部分程式碼:

         try
            {               
                channel.TxSelect();//開啟事務機制               
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes("hello world"));              
                channel.TxCommit();//提交
                Console.WriteLine($"send {msg}");
            }
            catch (Exception e)
            {                
                channel.TxRollback();//回滾
                Console.WriteLine(e);
            }

釋出者確認

一旦在通道上使用 confirm.select 方法,就認為該通道處於Publisher Confirms模式.事務通道不能進入Publisher Confirms模式,一旦通道處於Publisher Confirms模式,不能開啟事務.即事務和Publisher Confirms模式只能二選一.

釋出的訊息什麼時候會被broker確認?

對於不可路由的訊息,broker 將在 exchange 驗證訊息不會路由到任何佇列(發回一個空的佇列列表)後發出確認;如果訊息被設定為"必需訊息"釋出,即 BasicPublish() 方法的 "mandatory" 入參為true,那麼 BasicReturn 事件將在 BasicAcks事件之前觸發.否定確認 BasicNacks 事件也是如此.

對於可路由訊息,當所有佇列都接受訊息時才觸發BasicAcks 事件,對於路由到持久話佇列的永續性訊息,這意味著持久化到磁碟後才會觸發BasicAcks 事件;對於訊息的映象佇列,這意味著所有映象都已接受該訊息後才會觸發BasicAcks 事件.

釋出者確認分為同步和非同步兩種.

一.同步

生產者部分程式碼

            //開啟confirm機制
            channel.ConfirmSelect();
            string msg = "hello world ";           
            for (int i = 0; i < 10; i++)
            {
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i));
            }
          
        //可以傳送一批訊息後,呼叫該方法;也可以每發一條呼叫一次.       
            if (channel.WaitForConfirms())
            {
                Console.WriteLine("send is success");
            }
            else
            {
                Console.WriteLine("send is failed");
         //實際應用中,這裡需要添加發送訊息失敗的處理邏輯.
         //如果是傳送一批訊息,那麼只要有一條失敗,則所有的訊息傳送都會失敗. }

二.非同步

生產者部分程式碼

          channel.ConfirmSelect();  

//肯定確認 channel.BasicAcks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最後成功的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 成功傳送 "); } }; //否定確認 channel.BasicNacks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最後失敗的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 傳送失敗 "); } };

釋出者的否定確認(BasicNacks)

  • 在特殊情況下,當 broker 無法成功處理訊息而不是 BasicAck 時,broker將傳送 BasicNack.在這種情況下,BasicNack 的欄位與 BasicAck 相對應的欄位意義相同,並且 requeue 欄位是沒有意義的.是否重發訊息由傳送者自己決定;
  • 將channel設定為釋出者確認模式後,所有後續釋出的訊息都只會被 confirm 一次或者 nack 一次;
  • 沒有機制保證訊息需要多久被 confirmed;
  • 訊息不會同時被confirmed和nack`d;
  • BasicNacks 事件只在負責佇列的Erlang程序中發生內部錯誤時才會觸發;

持久化訊息的延遲肯定確認

前面說到,

如果是持久化的訊息,要等到訊息持久化到磁碟後才會觸發BasicAcks 事件;對於訊息的映象佇列,要等到所有映象都已接受該訊息後才會觸發BasicAcks 事件.

而為了保證持久化效率,RabbitMQ不是來一條存一條,而是定時批量地持久化訊息到磁碟.RabbitMQ 訊息儲存一段時間(幾百毫秒)之後或者當佇列空閒時,才會批量寫到磁碟.

這意味著在恆定負載下,BasicAck 的延遲可以達到幾百毫秒.如果佇列支援映象佇列,則延遲時間更大.

所以,為了提高吞吐量,強烈建議應用程式採用非同步確認方式,或者釋出批量訊息後等待確認.

釋出者確認的注意事項

在大多數情況下,RabbitMQ將以與釋出時相同的順序向釋出者確認訊息(這適用於在單個頻道上釋出的訊息).但是,釋出者確認是非同步發出的,可以確認單個訊息或一組訊息.發出確認的確切時刻取決於訊息的傳遞模式(永續性與瞬態)以及訊息路由到的佇列的屬性.也就是說,RabbitMQ可能不以訊息釋出的順序向釋出者傳送確認訊息.生產者端儘量不要依賴訊息確認的順序處理業務邏輯.