1. 程式人生 > >RabbitMQ訊息釋出和消費的確認機制

RabbitMQ訊息釋出和消費的確認機制

前言

新公司專案使用的訊息佇列是RabbitMQ,之前其實沒有在實際專案上用過RabbitMQ,所以對它的瞭解都談不上入門。趁著週末休息的時間也猛補習了一波,寫了兩個窗體應用,一個訊息釋出端和訊息消費端。園子裡解釋RabbitMQ基礎的很多了,這裡就不對RabbitMQ的基礎再做敘述了,來點實際工作中一定會碰到的問題和解決的方案。

RabbitMQ 訊息釋出確認機制

預設情況下訊息釋出端執行BasicPublish方法後,訊息是否到達指定的佇列的結果釋出端是未知的。BasicPublish方法的返回值是void。假設我們想對訊息進行監控,針對訊息傳送失敗後進行補發則需要一個訊息確認機制來幫我們實現。

  • 事務機制
  • Confirm確認機制

上面是已知可通過RabbitMQ自帶的特性實現訊息確認機制的兩種方式。

事務機制

事務機制依賴三個RabbitMQ提供的方法

  • txSelect()
  • txCommit()
  • txRollback()
    看名字大概知道意思了,特別是Commit和Rollback,使用方式和資料庫的事務使用幾乎一樣,txSelect()宣告事務的開始,txCommit()提交事務,txRollBack()執行提交失敗後的回滾。
    使用程式碼如下:
        // 採取RabbitMQ事務方式傳輸訊息
        private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與佇列和交換機進行繫結 否則就算事務提交了佇列也不會有資料~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            channel.TxSelect();
                            channel.BasicPublish(exchangeName, routingKey, null, messagebuffer);
                            //if (1 == 1) throw new Exception("沒錯!我是故意丟擲異常的!看看最終佇列是否寫入了訊息~");
                            channel.TxCommit();
                        }
                        catch (Exception ex)
                        {
                            Rtx_Receive.Text = Rtx_Receive.Text + $"\r 異常產生時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},異常資訊:{ex.Message}";
                            channel.TxRollback();
                            // TODO 進行補發OR其他邏輯處理
                        }
                        Rtx_Receive.Text = Rtx_Receive.Text + $"\r 傳送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"傳送訊息失敗!{ex.Message}");
            }
        }

  

需要注意的是 這裡的事務其實也只能保證在執行BasicPublish方法後且TxCommit方法執行前但凡出現異常則回滾!
上面是什麼意思呢?意思就是我只管訊息傳送到佇列裡,且在我定義的事務內沒有出現異常,出現了異常則將釋出的資料給撤銷!
但是,如果事務也提交了,但是訊息還是有可能不會送達佇列裡去!

比如,我將上面的程式碼改下

        // 採取RabbitMQ事務方式傳輸訊息
        private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與佇列和交換機進行繫結 否則就算事務提交了佇列也不會有資料~
                       //  channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            channel.TxSelect();
                            channel.BasicPublish(exchangeName, routingKey, null, messagebuffer);
                            //if (1 == 1) throw new Exception("沒錯!我是故意丟擲異常的!看看最終佇列是否寫入了訊息~");
                            channel.TxCommit();
                        }
                        catch (Exception ex)
                        {
                            Rtx_Receive.Text = Rtx_Receive.Text + $"\r 異常產生時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},異常資訊:{ex.Message}";
                            channel.TxRollback();
                            // TODO 進行補發OR其他邏輯處理
                        }
                        Rtx_Receive.Text = Rtx_Receive.Text + $"\r 傳送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"傳送訊息失敗!{ex.Message}");
            }
        }

  上面程式碼我將`channel.QueueBind(queueName, exchangeName, routingKey);` 這一行程式碼註釋掉,不將routingKey進行繫結,然後在RabbitMQ管理頁面將佇列、交換機刪除。如下圖

 

 

 

再執行程式碼,發現佇列是建立了,交換器也建立了,但是佇列裡沒有資料!

 

當然,問題出在沒有將佇列和交換器以及routingKey進行繫結,我們的訊息沒有進入到佇列的路由,最終導致了訊息進入了所謂的“黑洞”。

所以上面的佇列不是說能完全保證只要執行了TxRollback()我們的訊息佇列就一定會有資料!!!

Confirm確認機制

Confirm確認機制也很容易理解,它要求訊息生產端(Producer)對訊息傳送後RabbitMQ服務端返回一個已接收的指令,Producer收到該指令則認為該訊息已經發送成功。同時消費端(Consumer)也有同樣的機制,在從RabbitMQ服務端接收到訊息後,需要返回一個已處理的指令給服務端,服務端收到後則會認為該訊息已被消費。


下面是採取Confirm確認機制後的釋出訊息程式碼

        // 採取確認機制方式傳輸訊息
        private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與佇列和交換機進行繫結 否則就算事務提交了佇列也不會有資料~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        channel.ConfirmSelect(); // 啟用伺服器確認機制方式
                        channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer);
                        if (channel.WaitForConfirms())
                        {
                            Rtx_Receive.Text = $"\r 訊息傳送成功! 傳送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"傳送訊息失敗!{ex.Message}");
            }
        }

  

關鍵程式碼在於執行BasicPublish之前呼叫channel.ConfirmSelect()啟用伺服器確認,然後在釋出後通過呼叫 WaitForConfirms()得到訊息釋出結果,
true則表示訊息已釋出到了佇列裡。 OK,現在試下,到伺服器上刪除掉佇列、交換機資訊。然後程式碼去掉繫結交換機和路由鍵後試下,看看是否和事務方式一樣無法確認訊息是否真正抵達佇列。

刪除佇列和交換機,接下來更改程式碼,直接把上面的 channel.QueueBind(queueName, exchangeName, routingKey);註釋掉,然後執行下,看看channel.WaitForConfirms()返回true還是false~
哈哈哈~執行結果是true 但是我們佇列是不會有訊息進來的,所以確認機制和事務機制對訊息的釋出是否抵達佇列監控是一樣的,沒有說哪一種方式能絕對保證訊息抵達了佇列

針對訊息提交到了指定交換機但是最終沒有寫入到佇列的訊息如何追蹤

我們有一種方式可以捕獲釋出了訊息但是該訊息最終沒有寫入到佇列的情況,我們需要註冊IModel的BasicReturn事件,更新後的程式碼如下:

       // 採取確認機制方式傳輸訊息
        private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與佇列和交換機進行繫結 否則就算事務提交了佇列也不會有資料~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        channel.ConfirmSelect(); // 啟用伺服器確認機制方式
                        channel.BasicReturn += Channel_BasicReturn;
                        //mandatory為true表示交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列,那麼RabbitMQ 會呼叫Basic.Return 命令將訊息返回給生產者
                        channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer);
                        if (channel.WaitForConfirms())
                        {
                            Rtx_Receive.Text = $"\r 訊息傳送成功! 傳送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"傳送訊息失敗!{ex.Message}");
            }
        }

        /// <summary>
        /// 當訊息傳送不到佇列時候觸發
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            BeginInvoke(new Action(() => { Rtx_Receive.Text = $"\r 訊息傳送失敗!"; }));
        }  

訊息釋出確認機制結論

1、事務機制與確認機制都無法百分之百確認訊息是否寫入到了快取,可以理解為兩者都只能確認釋出的動作是否成功~但是訊息有無進入佇列是無法給予客戶端準確結果;
2、兩者效能比較而言事務的效能損耗更大;
3、註冊IModel的BasicReturn事件可以追蹤到沒有寫入到佇列的訊息

RabbitMQ 訊息消費確認機制

上面已經知道了如何對訊息的釋出進行確認,那麼消費資料時候我們肯定也想在消費完成後確認該訊息已經處理,希望佇列對其進行刪除。
而不是在我們的消費端程式未將訊息處理後,佇列就將其刪除了。

在此之前說下RabbitMQ消費者物件的兩種實現方式

  • 繼承DefaultBasicConsumer類
  • 例項化EventingBasicConsumer物件

繼承DefaultBasicConsumer方式

DefaultBasicConsumer是RabbitMQ.Client提供的一個消費者基類,該類實現了IBasicConsumer介面。
繼承DefaultBasicConsumer類後可重寫基類的部分方法來實現訊息獲取以及當前消費者各個狀態變更的事件,本文的示例程式碼即採用這種方式實現消費者物件。

例項化EventingBasicConsumer物件

這種方式採取註冊事件的方式接受訊息釋出者推送到佇列的訊息,程式碼如下:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {
     string message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine($"接受到訊息為:{message}");
 };  

消費端確認訊息已處理有兩種方式

  • 自動確認
  • 手動確認

自動確認

自動確認我理解的就是服務端認為你接受到訊息後即確認了,但是如果當你拿到訊息後,依賴此訊息的業務邏輯未處理完畢,但是卻中途異常了的話,此訊息也會消失掉!所以建議消費端採取手動確認!

手動確認

手動確認可以完美解決上面自動確認出現的問題,但是它也意味著我們開發者需要對確認的流程進行一個完整的閉環。即所有的訊息在消費端獲取到後必須有一個明確的結果返回給服務端(Broker)。 我們對訊息的處理結果要麼是確認處理,要麼是拒絕該訊息(返回給Broker,再分發給其他消費者)。如果我們沒有對訊息接收後進行任何反饋的話該條訊息在佇列的狀態會變成Unacked 直到我們消費端AMQP連線中斷後該訊息狀態又會變成Ready。狀態為Unacked的訊息會導致所有消費者都無法對該訊息進行二次消費(包含當前消費者),所以此類訊息越多則佔用的記憶體資源也會越多。當訊息變回Ready也會很煩人,因為我們已經對該訊息進行過一次處理了,如若我們沒有對訊息進行校驗則又會執行一遍。 所以手動確認必須執行回執!!!!!!

下面是手動確認訊息的程式碼:

        private void ReceiveMessage(RabbitMQConnectionDTO connectionDTO, string exchangeName, string queueName, string routtingKey)
        {
            try
            {
                var factory = new ConnectionFactory
                {
                    HostName = connectionDTO.HostName,
                    Password = connectionDTO.Password,
                    UserName = connectionDTO.UserName,
                    Port = connectionDTO.Port,
                };

                UseDefaultBasicConsumerType(factory, queueName);
                //DirectAcceptExchangeEvent(factory, exchangeName, queueName, routtingKey);
            }
            catch (Exception ex)
            {
                Rtx_SendContext.Text = $"出現異常:{ex.Message}";
            }
        }

        private void UseDefaultBasicConsumerType(ConnectionFactory factory, string queueName)
        {
            var connection = factory.CreateConnection();
            _channel = connection.CreateModel();
            // accept only one unack-ed message at a time
            // uint prefetchSize, ushort prefetchCount, bool global
            _channel.BasicQos(0, 1, false);

            //定義一個繼承了DefaultBasicConsumer類的消費類(DefaultBasicConsumer是繼承了IBasicConsumer介面的一個基類,裡面存在許多可重寫的方法)
            MessageReceiver messageReceiver = new MessageReceiver(_channel, (string msg, ulong deliveryTag) =>
            {
                string key = Txt_Key.Text.Trim();
                string keyNoReturn = Txt_KeyNoReturn.Text.Trim();
                bool isExecFlag = false;
                if (!string.IsNullOrWhiteSpace(key) && msg.StartsWith(key)) // 這裡要小心 如果只有當前1個消費者那你懂的~~~~~~
                    _channel.BasicReject(deliveryTag, requeue: true); //requeue表示訊息被拒絕後是否重新放回queue中
                else if (!string.IsNullOrWhiteSpace(keyNoReturn) && msg.StartsWith(keyNoReturn))
                    _channel.BasicReject(deliveryTag, requeue: false); //requeue表示訊息被拒絕後是否重新放回queue中
                else
                {
                    _channel.BasicAck(deliveryTag, multiple: false); //確認已處理訊息 multiple表示是否確認多條
                    isExecFlag = true;
                }
                BeginInvoke(new Action(() => { Rtx_SendContext.Text = Rtx_SendContext.Text + "\r" + $"處理標識{isExecFlag.ToString()} " + string.Format("***接收時間:{0},訊息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg); }));
            });
            _channel.BasicConsume(queueName, false, messageReceiver); //不開啟自動確認回執
        }

這裡的消費者物件通過繼承DefaultBasicConsumer物件而實現

程式碼如下:

    public class MessageReceiver : DefaultBasicConsumer
    { 
        private readonly Logger _logger;
        private readonly Action<string, ulong> _action;
        public MessageReceiver(IModel channel, Action<string, ulong> action)
        {
            _action = action; 
            _logger = LogManager.GetCurrentClassLogger();
        }

        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
        {
            string msg = Encoding.UTF8.GetString(body);
            _logger.Debug($"***************************Consuming Topic Message  時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}*********************************");
            _logger.Debug(string.Concat("Message received from the exchange ", exchange));
            _logger.Debug(string.Concat("Consumer tag: ", consumerTag));
            _logger.Debug(string.Concat("Delivery tag: ", deliveryTag));
            _logger.Debug(string.Concat("Routing tag: ", routingKey));
            _logger.Debug(string.Concat("Message: ", msg));
            _action?.Invoke(msg, deliveryTag);
        }

        /// <summary>
        /// 捕獲通道連線的關閉事件
        /// </summary>
        /// <param name="model"></param>
        /// <param name="reason"></param>
        public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
        {
            _logger.Debug($"進入MessageReceiver.HandleModelShutdown方法");
            base.HandleModelShutdown(model, reason);
        }

        public override void HandleBasicConsumeOk(string consumerTag)
        {
            _logger.Debug($"進入MessageReceiver.HandleBasicConsumeOk方法 consumerTag:{consumerTag}");
            base.HandleBasicConsumeOk(consumerTag);
        }

        /// <summary>
        ///  刪除佇列 會進入
        /// </summary>
        /// <param name="consumerTag"></param>
        public override void HandleBasicCancel(string consumerTag)
        {
            _logger.Debug($"進入MessageReceiver.HandleBasicCancel方法 consumerTag:{consumerTag}");
            base.HandleBasicCancel(consumerTag);
        }
    }

上面有引用到的_logger和_action先不用管,重點是_channel.BasicReject方法和_channel.BasicAck方法。他們分別是代表拒絕消費和確認消費。

上面的程式碼是消費者消費資料後給予了Broker明確的回執,我們試下將回執程式碼註釋掉後看下佇列裡的訊息變成什麼樣子了。 先刪除掉交換器和佇列,然後再發布資料,看看消費資料後不回執的訊息狀態~
這是釋出訊息到佇列後,Ready狀態的訊息為1條

 接下來,我們去消費資料但是不進行回執確認,看看結果如何

 如上圖,還是那條資料,狀態從Ready變成了Unacked,這時候是因為我的消費端應用還沒關閉,AMQP的連結也還在。我們到工作管理員內將消費應用關閉~

關閉後又變成了Ready, 意味著我們再次開啟消費端程式又可以從佇列獲取到之前的訊息了~
我們將上面的回執程式碼部分註釋取消,看看回執成功後佇列內的訊息狀態是什麼樣?

 可以看到回執確認後,我們的訊息就從佇列裡移除了~

訊息消費確認機制結論

1、自動確認雖然省程式碼但是可能會出現訊息丟失業務未處理完畢的情況;
2、手動確認訊息則是在獲取到訊息後,在沒有返回回執前,訊息會一致儲存在佇列

 

本文對應的程式碼已上傳至Github,地址:https://github.com/QQ897878763/RabbitMQ_Sample

 程式的執行截圖如下:

&n