1. 程式人生 > 實用技巧 >RabbitMQ訊息積壓的幾種解決思路

RabbitMQ訊息積壓的幾種解決思路

在日常工作中使用RabbitMQ偶爾會遇不可預料的情況導致的訊息積壓,一般出現訊息積壓基本上分為幾種情況:

  1. 消費者消費訊息的速度趕不上生產速度,這總問題主要是業務邏輯沒設計好消費者和生產者之間的平衡,需要改業務流程或邏輯已保證消費度跟上生產訊息的速,譬如增加消費者的數量等。

  2. 消費者出現異常,導致一直無法接收新的訊息,這種問題需要排查消費的邏輯是不是又問題,需要優化程式。

除了上面的者兩種問題,還有一些其他情況會導致訊息積壓,譬如一些系統是無法預計成產訊息的速度和頻率,又或者消費者的速度已經被限制,不能通過加新的消費者來解決,譬如不同的系統間的API對接,對接那一方就做了請求頻率的限制,或者對方系統承受不了太大的併發,還有一些系統如果是面對企業客戶,譬如電商,物流,倉儲等類似平臺系統的客戶的下單是沒有規律的或者集中某一個時間段下單的,這種就不能簡單的通過加消費者來解決,就需要分析具體業務來避免訊息積壓。

針對這種情況,我想到了4中解決思路:

  1. 拆分MQ,生產者一個MQ,消費者一個MQ,寫一個程式監聽生產者的MQ模擬消費速度(譬如執行緒休眠),然後傳送到消費者的MQ,如果訊息積壓則只需要處理生產者的MQ的積壓訊息,不影響消費者MQ

  2. 拆分MQ,生產者一個MQ,消費者一個MQ,寫一個程式監聽生產者的MQ,定義一個全域性靜態變數記錄上一次消費的時間,如果上一次時間和當前時間只差小於消費者的處理時間,則傳送到一個延遲佇列(可以使用死信佇列實現)傳送到消費者的MQ,如果訊息積壓則只需要處理生產者的MQ的積壓訊息,不影響消費者MQ

  3. 使用Redis的List或ZSET做接收訊息快取,寫一個程式按照消費者處理時間定時從Redis取訊息傳送到MQ

  4. 設定訊息過期時間,過期後轉入死信佇列,寫一個程式處理死信訊息(重新如佇列或者即使處理或記錄到資料庫延後處理)

其中使用延時佇列會相對來說邏輯簡單,業務邏輯變更也不大,在RabbitMQ中,可使用死信來及延時佇列外掛rabbitmq_delayed_message_exchange兩種方式實現延時佇列。
使用外掛可以在官網找到:https://www.rabbitmq.com/community-plugins.html

外掛的安裝及使用方式就不做介紹了,主要介紹下使用死信來實現延時佇列,原理就是將訊息傳送到一個死信佇列,並設定過期時間,過期後將死信轉發到要處理的訊息佇列。
生產者相關程式碼:

          /// <summary>
        /// 傳送延時佇列訊息
        /// </summary>
        /// <param name="message"></param>
        /// <param name="queueName"></param>
        /// <param name="prefetchCount">預設20</param>
        public void SendDelayQueues(string message, string queueName,double delayMilliseconds,string beDeadLetterPrefix="beDeadLetter_")
        {
            #region 死信到期後轉入的交換機及佇列
            //死信轉入新的佇列的路由鍵(消費者使用的路由鍵)
            var routingKey = queueName;
            var exchangeName = queueName;
            //定義佇列
            Channel.QueueDeclare(queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
            //定義交換機
            Channel.ExchangeDeclare(exchange: exchangeName,
                type: "direct");
            //佇列繫結到交換機
            Channel.QueueBind(queue: queueName,
                exchange: exchangeName,
                routingKey: routingKey);
            #endregion

            //將變成死信的佇列名
            var beDeadLetterQueueName = beDeadLetterPrefix + queueName;
            //將變成死信的交換機名
            var beDeadLetterExchangeName = beDeadLetterPrefix + queueName;

            //定義一個有延遲的交換機來做死信(該訊息不能有消費者,不然無法變成死信)
            Channel.ExchangeDeclare(exchange:beDeadLetterExchangeName ,
                type: "direct");
            
            //定義該延遲訊息過期變成死信後轉入的交換機(消費者需要繫結的交換機)
            //Channel.ExchangeDeclare(exchange: queueName,type: "direct");

            var dic = new Dictionary<string, object>();
            //dic.Add("x-expires", 30000);
            //dic.Add("x-message-ttl", 12000);//佇列上訊息過期時間,應小於佇列過期時間  
            dic.Add("x-dead-letter-exchange", queueName);//變成死信後轉向的交換機
            dic.Add("x-dead-letter-routing-key",routingKey);//變成死信後轉向的路由鍵
            //定義將變成死信的佇列
            Channel.QueueDeclare(queue: beDeadLetterQueueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: dic);

            //佇列繫結到交換機
            Channel.QueueBind(queue: beDeadLetterQueueName,
                exchange: beDeadLetterExchangeName,
                routingKey: routingKey);

            //不要同時給一個消費者推送多於prefetchCount個訊息, ushort prefetchCount = 20
            //Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
            var body = Encoding.UTF8.GetBytes(message);
            var properties = Channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.DeliveryMode = 2;//持久化訊息
            //過期時間
            properties.Expiration = delayMilliseconds.ToString();
            Channel.BasicPublish(exchange: beDeadLetterExchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);
        }

消費者相關程式碼:

        /// <summary>
        /// 設定延遲佇列接收的事件
        /// </summary>
        /// <param name="action"></param>
        /// <param name="queueName"></param>
        /// <param name="prefetchCount">預設1</param>
        /// <param name="autoAck"></param>
        /// <param name="consumerCount"></param>
        public void SetDelayQueuesReceivedAction(Action<string> action, string queueName, ushort prefetchCount = 1,
            bool autoAck = false, int consumerCount = 1)
        {
            if (prefetchCount < 1)
            {
                throw new Exception("consumerCount must be greater than 1 !");
            }

            var exchangeName = queueName;
            var routingKey = queueName;
            for (int i = 0; i < consumerCount; i++)
            {
                var Channel = Connection.CreateModel();
                //定義佇列
                Channel.QueueDeclare(queue: queueName,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
                //定義交換機
                Channel.ExchangeDeclare(exchange: exchangeName,
                    type: "direct");
                //佇列繫結到交換機
                Channel.QueueBind(queue: queueName,
                    exchange: exchangeName,
                    routingKey: routingKey);
                //不要同時給一個消費者推送多於prefetchCount個訊息
                Channel.BasicQos(prefetchSize: 0, prefetchCount: prefetchCount, global: false);
                ChannelList.Add(Channel);
                var consumer = new EventingBasicConsumer(Channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    //Console.WriteLine("處理消費者ConsumerTag:" + ea.ConsumerTag);
                    action(message);
                    //手動確認訊息應答
                    Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                //autoACK自動訊息應答設定為false
                Channel.BasicConsume(queue: queueName, autoAck: autoAck, consumer: consumer);
            }
        }

完整程式碼實現放到了Github:https://github.com/tanyongzheng/TZ.RabbitMQ