1. 程式人生 > 其它 >RabbitMQ死性佇列,延時佇列

RabbitMQ死性佇列,延時佇列

1.前言

之前一直聽說過RabbitMQ的死性佇列,延時佇列,但是沒有深入的瞭解,結合自己實際工作中使用到的RabbitMQ,現在整理出來當做個人學習筆記。

2.死性佇列介紹

  死性佇列:DLX(dead-letter-exchange),利用DLX,當訊息在一個佇列中變成死性(dead message)之後他能被重新push到另外一個Exchange,那麼這個Exchange就是DLX。

  訊息變成死性一般有一下三種情況:

    (1)訊息被拒(basic.reject or basic.nack)並且沒有重新入隊(requeue=false);

    (2)當前佇列中的訊息數量已經超過最大長度(建立佇列時指定" x-max-length引數設定佇列最大訊息數量)。

    (3)訊息在佇列中過期,即當前訊息在佇列中的存活時間已經超過了預先設定的TTL(Time To Live)時間;

3.死性處理過程

    (1)DLX也是一個正常的交換機(Exchange),和一般的Exchange沒有區別,他能在任何的佇列上指定,實際上就是設定某個佇列的屬性。

    (2)當這個佇列中有死性時,RabbitMQ就會自動的將這個訊息重新發送到設定的Exchange中,進而被路由到另一個佇列。

    (3)可以監聽到這個佇列(DLX)做特殊處理。

4.demo演示

  設定場景1:Q1中佇列資料不完整,就算從新處理也會報錯,那就可以不ack,把這個訊息轉到死信佇列另外處理。

生產者:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMqClient
{
    class Program
    {
        static IConnectionFactory factory = new ConnectionFactory()
        {
            HostName = "127.0.0.1",
            UserName = "admin",
            Password 
= "admin", VirtualHost = "/" }; static void Main(string[] args) { Console.WriteLine("Hello Producer!"); SendMessage(); Console.ReadLine(); } public static void SendMessage() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信佇列 string dlxQueueName = "dlx.queue"; //訊息交換機 string exchange = "direct-exchange"; //訊息佇列 string queueName = "queue_a"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //建立死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //建立死信佇列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信佇列繫結死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 建立訊息交換機 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //建立訊息佇列,並指定死信佇列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //設定當前佇列的DLX(死信交換機) { "x-dead-letter-routing-key",dlxQueueName}, //設定DLX的路由key,DLX會根據該值去找到死信訊息存放的佇列 }); //訊息佇列繫結訊息交換機 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //釋出訊息 channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"向佇列:{queueName}傳送訊息:{message}"); } } } } }

其中在佇列queue_a上設定屬性值x-dead-letter-exchangex-dead-letter-routing-key

消費者:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMqServer
{
    class Program
    {
        static IConnectionFactory factory = new ConnectionFactory()
        {
            HostName = "127.0.0.1",
            UserName = "admin",
            Password = "admin",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            Console.WriteLine("Hello Consumer!");
            Consumer();
            Console.ReadLine();
        }
        public static void Consumer()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信佇列
            string dlxQueueName = "dlx.queue";

            //訊息交換機
            string exchange = "direct-exchange";
            //訊息佇列
            string queueName = "queue_a";
            var connection = factory.CreateConnection();
            {
                //建立通道
                var channel = connection.CreateModel();
                {

                    //建立死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立死信佇列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信佇列繫結死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 建立訊息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立訊息佇列,並指定死信佇列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設定當前佇列的DLX
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設定DLX的路由key,DLX會根據該值去找到死信訊息存放的佇列
                                         });
                    //訊息佇列繫結訊息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"佇列{queueName}消費訊息:{message},不做ack確認");
                        //channel.BasicAck(ea.DeliveryTag, false);
                        //不ack(BasicNack),且不把訊息放回佇列(requeue:false)
                        channel.BasicNack(ea.DeliveryTag, false, requeue: false);
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }
    }
}

消費者加上channel.BasickNack()模擬訊息處理不了,不ack確認。

然後我們到RabbitMQ的web端看下

看到訊息佇列為queue_a,特性有DLX(死信交換機),DLK(死信路由)。因為消費端不nack,觸發了死信,被轉發到了死信佇列dlx.queue。

5.延時佇列

延時佇列是配合死性佇列一起使用,給佇列新增訊息過時時間(TTL)變成延時佇列。

簡單的描述就是:P(生產者)傳送訊息到Q1(延時佇列),Q1的訊息有過期時間,比如10s,那10s後訊息過期就會觸發死信,從而把訊息轉發到Q2(死信佇列)。

解決問題場景:像商城下單,未支付時取消訂單場景。下單時寫一條記錄入Q1,延時30分鐘後轉到Q2,消費Q2,檢查訂單,支付則不做操作,沒支付則取消訂單,恢復庫存。

生產者:

public static void SendMessage_delay()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信佇列
            string dlxQueueName = "dlx.queue";

            //訊息交換機
            string exchange = "direct-exchange";
            //訊息佇列
            string queueName = "queue_a";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {

                    //建立死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立死信佇列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信佇列繫結死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 建立訊息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立訊息佇列,並指定死信佇列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設定當前佇列的DLX(死信交換機)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設定DLX的路由key,DLX會根據該值去找到死信訊息存放的佇列
                                            { "x-message-ttl",10000}
                                        });
                    //訊息佇列繫結訊息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //釋出訊息
                    for (int i = 0; i < 10; i++)
                    {
                        message += i;
                        channel.BasicPublish(exchange: exchange,
                                         routingKey: queueName,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes(message));
                        Console.WriteLine($"{DateTime.Now}向佇列:{queueName}傳送訊息:{message}");
                        System.Threading.Thread.Sleep(1000);
                    }

                }
            }
        }

消費者:

public static void Consumer_delay()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信佇列
            string dlxQueueName = "dlx.queue";

            var connection = factory.CreateConnection();
            {
                //建立通道
                var channel = connection.CreateModel();
                {

                    //建立死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立死信佇列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信佇列繫結死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{DateTime.Now}佇列{dlxQueueName}消費訊息:{message}");
                        channel.BasicAck(ea.DeliveryTag, false);
                        //不ack(BasicNack),且不把訊息放回佇列(requeue:false)
                        //channel.BasicNack(ea.DeliveryTag, false, requeue: false);
                    };
                    channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
                }
            }
        }

向延時佇列傳送訊息,監聽死信佇列,傳送和收到訊息時間剛好是設定的10s。

6.延時訊息設定不同過期時間

上面的延時佇列能解決訊息過期時間都是相同的場景,能不能解決訊息的過期時間是不一樣的呢?

例如場景:機器人客服,為了更像人為操作,收到訊息後要隨機3-10秒回覆客戶。

1)佇列不設定TTL(訊息過期時間),把過期時間設定在訊息上。

生產者:

public static void SendMessage_delay()
        {
            //死信交換機
            string dlxexChange = "dlx.exchange";
            //死信佇列
            string dlxQueueName = "dlx.queue";

            //訊息交換機
            string exchange = "direct-exchange";
            //訊息佇列
            string queueName = "queue_a";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {

                    //建立死信交換機
                    channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立死信佇列
                    channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false);
                    //死信佇列繫結死信交換機
                    channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName);

                    // 建立訊息交換機
                    channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false);
                    //建立訊息佇列,並指定死信佇列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments:
                                        new Dictionary<string, object> {
                                             { "x-dead-letter-exchange",dlxexChange}, //設定當前佇列的DLX(死信交換機)
                                             { "x-dead-letter-routing-key",dlxQueueName}, //設定DLX的路由key,DLX會根據該值去找到死信訊息存放的佇列
                                        });
                    //訊息佇列繫結訊息交換機
                    channel.QueueBind(queueName, exchange, routingKey: queueName);

                    string message = "hello rabbitmq message 10s後處理";
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Expiration = "10000";
                    //釋出訊息
                    channel.BasicPublish(exchange: exchange,
                                     routingKey: queueName,
                                     basicProperties: properties,
                                     body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now}向佇列:{queueName}傳送訊息:{message}");


                    message = "hello rabbitmq message 5s後處理";
                    properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    properties.Expiration = "5000";
                    channel.BasicPublish(exchange: exchange,
                                    routingKey: queueName,
                                    basicProperties: properties,
                                    body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine($"{DateTime.Now}向佇列:{queueName}傳送訊息:{message}");

                }
            }
        }

消費者程式碼和上面一致。

結論:

生產者向佇列中傳送一條延時10s的訊息再發一條延時5秒的訊息,但消費者卻先拿到延時10s的,再拿到延時5秒的,我想要的結果是先拿到延時5s的再拿到延時10s的,是什麼原因呢?

原因是:佇列是先進先出的,而RabbitMQ只會對首位第一條訊息做檢測,第一條沒過期,那麼後面的訊息就會阻塞住等待前面的過期。

解決辦法:增加一個消費者對延時佇列消費,不ack,把第一條訊息放到佇列尾部。一直讓訊息在流動,這樣就能檢測到了。

新增消費者程式碼:

  

public static void Consumer_NormalConsumer()
        {
            //死信佇列
            string dlxQueueName = "queue_a";

            var connection = factory.CreateConnection();
            {
                //建立通道
                var channel = connection.CreateModel();
                {
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        //Console.WriteLine($"{DateTime.Now}佇列{dlxQueueName}消費訊息:{message}");
                        //不ack(BasicNack),且不把訊息放回佇列(requeue:false)
                        System.Threading.Thread.Sleep(20);
                        channel.BasicNack(ea.DeliveryTag, false, requeue: true);
                    };
                    channel.BasicConsume(dlxQueueName, autoAck: false, consumer);
                }
            }
        }

本文來自部落格園,作者:可樂加冰-Mr-Wang,轉載請註明原文連結:https://www.cnblogs.com/helloworld-wang/p/15213854.html