1. 程式人生 > 實用技巧 >RabbitMQ (三) 工作佇列之輪詢分發

RabbitMQ (三) 工作佇列之輪詢分發

上一篇講了簡單佇列,實際工作中,這種佇列應該很少用到,因為生產者傳送訊息的耗時一般都很短,但是消費者收到訊息後,往往伴隨著對高訊息的業務邏輯處理,是個耗時的過程,這勢必會導致大量的訊息積壓在一個消費者手中,從而導致業務的積壓.

所以我們需要多個消費者一起消費佇列中的訊息,模型如下:(為了方便講解,暫時隱藏掉"交換機")

生產者

    public class Producer
    {
        private const string QueueName = "test_work_queue";
        public static void Send()
        {
            //獲取一個連線
            using (IConnection connection = ConnectionHelper.GetConnection())
            {
                //從連線中獲取一個通道
                using (IModel channel = connection.CreateModel())
                {
                    //宣告佇列
                    channel.QueueDeclare(QueueName, false, false, false, null);

                    for (int i = 0; i < 50; i++)
                    {
                        //建立訊息
                        string msg = "hello world " + i;
                        //傳送訊息
                        channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                        Console.WriteLine($"{DateTime.Now} : send {msg}");
                    }
                }
            }
        }
    }

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_work_queue";
        public static void Receive()
        {
            //獲取一個連線
            IConnection connection = ConnectionHelper.GetConnection();

            //從連線中獲取一個通道
            IModel channel = connection.CreateModel();

            //宣告佇列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //新增消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //註冊消費者收訊息事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 receive : " + str);
                Thread.Sleep(500);//休息0.5秒
            };

            //開啟消費者監聽
            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

消費者2

只有一點點區別:

                Console.WriteLine("consumer2 receive : " + str);
                Thread.Sleep(1000);//休息1秒

我們這裡故意讓兩個消費者處理訊息的耗時不一樣,一個0.5秒,一個1秒.

我們來看看結果:

可以非常清楚的看到,儘管兩個消費者處理訊息的"耗時"不一樣,但是處理的"數量"是一樣的.

這裡有幾個細節要說明一下:

1.在生產者和兩個消費者中都聲明瞭同一個佇列.其實,如果這個佇列之前已經存在了,那麼生產者和消費者都可以不用再聲明瞭;

2.一定要先啟動兩個消費者,再啟動生產者.原因是,我們上面的程式碼中,消費者的BasicConsume 方法的第2個引數傳入的是 true,

這個引數就是autoAck :是否自動確認(上面文章有講過).

所以如果先開啟生產者,那麼會瞬間傳送完50條訊息,這時候啟動消費者1,那麼會立刻"消費"掉這50條訊息.有朋友肯定要問,不是"睡"了0.5秒麼?

這裡"睡"0.5秒,是對訊息的業務邏輯處理耗時,而不是"消費"訊息,訊息已經在消費者啟動的那一刻從佇列中"拿"過來了;

同時,由於採用的是"自動確認",所以佇列看到50條都被"確認"了,就會將這些訊息從佇列中移除.

這時候再啟動消費者2,則不會收到任何訊息.