1. 程式人生 > 實用技巧 >RabbitMQ (十一) 訊息確認機制 - 消費者確認

RabbitMQ (十一) 訊息確認機制 - 消費者確認

由於生產者和消費者不直接通訊,生產者只負責把訊息傳送到佇列,消費者只負責從佇列獲取訊息(不管是push還是pull).

訊息被"消費"後,是需要從佇列中刪除的.那怎麼確認訊息被"成功消費"了呢?

是消費者從佇列獲取到訊息後,broker 就從佇列中刪除該訊息?

那如果消費者收到訊息後,還沒來得及"消費"它,或者說還沒來得及進行業務邏輯處理時,消費者所在的通道或者連線因某種原因斷開了,

那這條訊息豈不是就被無情的拋棄了...

我們更期望的是,消費者從佇列獲取到訊息後,broker 暫時不刪除該條訊息,

等到消費者"成功消費"掉該訊息後,再刪除它.

所以需要一個機制來確認生產者傳送的訊息被消費者"成功消費".

RabbitMQ 提供了一種叫做"消費者確認"的機制.

消費者確認

消費者確認分兩種:自動確認手動確認.

在自動確認模式中,訊息在傳送到消費者後即被認為"成功消費".這種模式可以降低吞吐量(只要消費者可以跟上),以降低交付和消費者處理的安全性.這種模式通常被稱為“即發即忘”.與手動確認模型不同,如果消費者的TCP連線或通道在真正的"成功消費"之前關閉,則伺服器傳送的訊息將丟失.因此,自動訊息確認應被視為不安全,並不適用於所有工作負載.

使用自動確認模式時需要考慮的另一件事是消費者過載.手動確認模式通常與有界通道預取(BasicQos方法)一起使用,該預取限制了通道上未完成(“進行中”)的訊息的數量.但是,自動確認沒有這種限制.因此,消費者可能會被訊息的傳送速度所淹沒,可能會導致訊息積壓並耗盡堆或使作業系統終止其程序.某些客戶端庫將應用TCP反壓(停止從套接字讀取,直到未處理的交付積壓超過某個限制).因此,僅建議能夠以穩定的速度有效處理訊息的消費者使用自動確認模式.

1.自動確認 autoAck : true

下面是消費者的部分程式碼,我們故意每次只推送一條訊息,並且讓每條訊息的處理都超過10秒.

            channel.BasicQos(0, 1, false);//將Qos預取值設定為1,這表示設定broker每次只推送佇列裡面的一條訊息到消費者,只有在確認這條訊息"成功消費"後,才會繼續推送
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Thread.Sleep(10000);
                Console.WriteLine("consumer1 receive : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: true, consumer: consumer);

下面是生產者的部分程式碼

                for (byte i = 0; i < 5; i++)
                {
                    string msg = "hello world " + i;
                    channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                    Console.WriteLine($"send {msg}");
                }

執行結果:

從管理後臺可以看到,消費者還沒列印"receive"那句話,該佇列中就已經沒有任何訊息了.

2.手動確認autoAck : false

手動確認又分兩種:肯定確認否定確認.

1)肯定確認 BasicAck

消費者部分程式碼:

            channel.BasicQos(0, 1, false);//設定broker每次只推送佇列裡面的一條訊息到消費者,只有在確認這條訊息"成功消費"後,才會繼續推送
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Console.WriteLine("consumer1 receive : " + str);
                Thread.Sleep(30000);
                //deliveryTag 傳遞標籤,ulong 型別.它的範圍隸屬於每個通道.因此必須在收到訊息的相同通道上確認.不同的通道將導致“未知的傳遞標籤”協議異常並關閉通道.
                //multiple 確認一條訊息還是多條.false 表示只確認 e.DelivertTag 這條訊息,true表示確認 小於等於 e.DelivertTag 的所有訊息 
                channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
                Console.WriteLine("consumer1 Ack : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

生產者程式碼不變.

當消費者收到一條訊息,但是還沒有肯定確認時,從管理後臺可以清晰的看到,佇列中一共有5條訊息,其中4條尚未推送,1條已經推送但尚未確認.

當消費者確認後(立馬又接收了一條),這時候,佇列中一共只有4條了,"成功消費"的那條已經被broker從佇列中刪掉了.剩餘3條尚未推送,1條已推送但尚未確認.

2)否定確認BasicNack , BasicReject

否定確認的場景不多,但有時候某個消費者因為某種原因無法立即處理某條訊息時,就需要否定確認了.

否定確認時,需要指定是丟棄掉這條訊息,還是讓這條訊息重新排隊,過一會再來,又或者是讓這條訊息重新排隊,並儘快讓另一個消費者接收並處理它.

i.丟棄requeue: false

消費者部分程式碼:

            channel.BasicQos(0, 1, false);
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Thread.Sleep(10000);
                channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: false);
                Console.WriteLine("consumer1 Nack : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

ii.重新排隊requeue: true

消費者部分程式碼:

            channel.BasicQos(0, 1, false);//設定broker每次只從推送佇列裡面的一條訊息到消費者,只有在確認這條訊息"成功消費"後,才會繼續推送
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Thread.Sleep(5000);
                channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);
                Console.WriteLine("consumer1 Nack : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

執行結果:

可以看到,消費者收到的一直是"hello world 0"這條訊息,而管理後臺一直顯示 4,1,5.這是為什麼呢?

首先,我們設定的是每次只推送一條訊息給消費者,否定確認中我們選擇的是重新排隊,所以"hello world 0"這條訊息被否定確認後,被broker安排去重新排隊了.當訊息被重新排隊時,如果可能的話,它將被放置在其佇列中的原始位置.也就是說"hello world 0"這條訊息又被放到了佇列頭,因為它的原始位置就是佇列頭.所以結果就變成了消費之一直在消費"hello world 0",並且一直在否定確認.

感覺這種方式的代價是不是有點大...訊息重新排隊,還要回到之前的位置,還要重新發送一次....感覺代價有點小貴啊...而且其他訊息貌似永遠只有ready...

但,如果多個消費者共享佇列時,該訊息將被重新排隊到更靠近佇列頭的位置,並且會被聰明的broker從佇列中推送到其他佇列.

測試:

我們重新建立兩個消費者:consumer1 否定確認,3秒一次;consumer2 肯定確認,1秒一次.兩個消費共享一個佇列(公平分發)

            channel.BasicQos(0, 1, false);
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Thread.Sleep(3000);
                channel.BasicNack(e.DeliveryTag, false, true);
                Console.WriteLine($"{DateTime.Now} consumer1 Nack : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

            channel.BasicQos(0, 1, false);
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Thread.Sleep(1000);
                channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
                Console.WriteLine($"{DateTime.Now} consumer2 Ack : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

執行結果:

一切盡在圖中.

BasicReject 方法和BasicNack 方法基本一樣,唯一的區別是沒有multiple 這個入參.

消費者確認模式,預取和吞吐量

確認模式和QoS預取值對消費者吞吐量具有顯著影響。通常,增加預取將提高向消費者傳遞訊息的速率。自動確認模式可以產生最佳的交付率。但是,在這兩種情況下,已傳送但尚未處理的訊息的數量也將增加,從而增加了消費者的RAM消耗。

應謹慎使用具有無限預取功能的自動確認模式或手動確認模式。在沒有確認的情況下消耗大量訊息的消費者將導致他們所連線的節點上的記憶體消耗增長。找到合適的預取值需要不斷試驗,並且會因工作負載而異。100到300範圍內的值通常可提供最佳吞吐量,並且不會面臨壓倒性消費者的重大風險。較高的價值往往會影響收益遞減規律。

預取值1是最保守的。它將顯著降低吞吐量,特別是在消費者連線延遲較高的環境中。對於許多應用來說,更高的值是合適的和最佳的。

當消費者失敗或失去連線時:自動重新排隊

使用手動確認時,除了我們主動讓訊息重新排隊外,任何未確認的訊息都將在關閉發生傳遞的通道(或連線)時自動重新排隊。這包括客戶端的TCP連線丟失,消費者應用程式(程序)故障和通道級協議異常.請注意,檢測不可用的客戶端需要一段時間。

由於這種行為,消費者必須準備好處理重新發送,否則就要考慮到冪等性。BasicDeliverEventArgs有一個特殊的布林屬性 : Redelivered,如果該訊息是第一次交付,它將被設定為false.否則為 true.

測試:

還是借用上一個測試的程式碼,只是分別加了一句話:

Console.WriteLine($"{str} 是否是重複傳送 : " + e.Redelivered);

執行結果:

這裡要特別注意,consumer2 收到 "hello world 0"的時候,Redelivered 的值依然是 true . 因為Redelivered 屬性的維度是訊息,不是消費者.