RabbitMQ (十二) 訊息確認機制 - 釋出者確認
消費者確認解決的問題是確認訊息是否被消費者"成功消費".
它有個前提條件,那就是生產者釋出的訊息已經"成功"傳送出去了.
因此還需要一個機制來告訴生產者,你傳送的訊息真的"成功"傳送了.
在標準的AMQP 0-9-1,保證訊息不會丟失的唯一方法是使用事務:在通道上開啟事務,釋出訊息,提交事務.但是事務是非常重量級的,它使得RabbitMQ的吞吐量降低250倍.為了解決這個問題,RabbitMQ 引入了 釋出者確認(Publisher Confirms) 機制,它是模仿AMQP協議中的消費者訊息確認機制.
事務機制
生產者部分程式碼:
try { channel.TxSelect();//開啟事務機制 channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes("hello world")); channel.TxCommit();//提交 Console.WriteLine($"send {msg}"); } catch (Exception e) { channel.TxRollback();//回滾 Console.WriteLine(e); }
釋出者確認
一旦在通道上使用 confirm.select 方法,就認為該通道處於Publisher Confirms模式.事務通道不能進入Publisher Confirms模式,一旦通道處於Publisher Confirms模式,不能開啟事務.即事務和Publisher Confirms模式只能二選一.
釋出的訊息什麼時候會被broker確認?
對於不可路由的訊息,broker 將在 exchange 驗證訊息不會路由到任何佇列(發回一個空的佇列列表)後發出確認;如果訊息被設定為"必需訊息"釋出,即 BasicPublish() 方法的 "mandatory" 入參為true,那麼 BasicReturn 事件將在 BasicAcks事件之前觸發.否定確認 BasicNacks 事件也是如此.
對於可路由訊息,當所有佇列都接受訊息時才觸發BasicAcks 事件,對於路由到持久話佇列的永續性訊息,這意味著持久化到磁碟後才會觸發BasicAcks 事件;對於訊息的映象佇列,這意味著所有映象都已接受該訊息後才會觸發BasicAcks 事件.
釋出者確認分為同步和非同步兩種.
一.同步
生產者部分程式碼
//開啟confirm機制 channel.ConfirmSelect(); string msg = "hello world "; for (int i = 0; i < 10; i++) { channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i)); } //可以傳送一批訊息後,呼叫該方法;也可以每發一條呼叫一次. if (channel.WaitForConfirms()) { Console.WriteLine("send is success"); } else { Console.WriteLine("send is failed");
//實際應用中,這裡需要添加發送訊息失敗的處理邏輯.
//如果是傳送一批訊息,那麼只要有一條失敗,則所有的訊息傳送都會失敗. }
二.非同步
生產者部分程式碼
channel.ConfirmSelect();
//肯定確認 channel.BasicAcks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最後成功的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 成功傳送 "); } }; //否定確認 channel.BasicNacks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最後失敗的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 傳送失敗 "); } };
釋出者的否定確認(BasicNacks)
- 在特殊情況下,當 broker 無法成功處理訊息而不是 BasicAck 時,broker將傳送 BasicNack.在這種情況下,BasicNack 的欄位與 BasicAck 相對應的欄位意義相同,並且 requeue 欄位是沒有意義的.是否重發訊息由傳送者自己決定;
- 將channel設定為釋出者確認模式後,所有後續釋出的訊息都只會被 confirm 一次或者 nack 一次;
- 沒有機制保證訊息需要多久被 confirmed;
- 訊息不會同時被confirmed和nack`d;
- BasicNacks 事件只在負責佇列的Erlang程序中發生內部錯誤時才會觸發;
持久化訊息的延遲肯定確認
前面說到,
如果是持久化的訊息,要等到訊息持久化到磁碟後才會觸發BasicAcks 事件;對於訊息的映象佇列,要等到所有映象都已接受該訊息後才會觸發BasicAcks 事件.
而為了保證持久化效率,RabbitMQ不是來一條存一條,而是定時批量地持久化訊息到磁碟.RabbitMQ 訊息儲存一段時間(幾百毫秒)之後或者當佇列空閒時,才會批量寫到磁碟.
這意味著在恆定負載下,BasicAck 的延遲可以達到幾百毫秒.如果佇列支援映象佇列,則延遲時間更大.
所以,為了提高吞吐量,強烈建議應用程式採用非同步確認方式,或者釋出批量訊息後等待確認.
釋出者確認的注意事項
在大多數情況下,RabbitMQ將以與釋出時相同的順序向釋出者確認訊息(這適用於在單個頻道上釋出的訊息).但是,釋出者確認是非同步發出的,可以確認單個訊息或一組訊息.發出確認的確切時刻取決於訊息的傳遞模式(永續性與瞬態)以及訊息路由到的佇列的屬性.也就是說,RabbitMQ可能不以訊息釋出的順序向釋出者傳送確認訊息.生產者端儘量不要依賴訊息確認的順序處理業務邏輯.