1. 程式人生 > 實用技巧 >RabbitMQ (四) 工作佇列之公平分發

RabbitMQ (四) 工作佇列之公平分發

上篇文章講的輪詢分發 : 1個佇列,無論多少個消費者,無論消費者處理訊息的耗時長短,大家消費的數量都一樣.

而公平分發,又叫 : 能者多勞,顧名思義,處理得越快,消費得越多.

生產者

    public class Producer
    {
        private const string QueueName = "test_work2_queue";

        public static void Send()
        {
            //獲取一個連線
            IConnection connection = ConnectionHelper.GetConnection();

            //從連線中獲取一個通道
            IModel channel = connection.CreateModel();

            //宣告佇列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //每次只向消費者傳送一條訊息,消費者使用後,手動確認後,才會傳送另外一條
            channel.BasicQos(0, 1, false);

            for (int i = 0; i < 50; i++)
            {
                string msg = "hello world " + i;

                //傳送訊息
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                Console.WriteLine($"send {msg}");
            }

            channel.Close();
            connection.Close();
        }
    }

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_work2_queue";
        public static void Receive()
        {
            //獲取連線
            IConnection connection = ConnectionHelper.GetConnection();

            //建立通道
            IModel channel = connection.CreateModel();

            //宣告佇列
            channel.QueueDeclare(QueueName, false, false, false, null);

            channel.BasicQos(0, 1, false);

            //新增消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //註冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string msg = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + msg);
                Thread.Sleep(2000);//休息2秒
                channel.BasicAck(e.DeliveryTag, false);//手動確認,false表示只確認當前這條訊息已收到,ture表示在當前這條訊息及之前(小於 DelivertTag )的所有未確認的訊息都已收到.
            };

            //監聽佇列,第2個引數設定為手動確認.true 則為自動確認.           
            channel.BasicConsume(QueueName, false, "", false, false, null, consumer);
        }
    }

消費者2

                Console.WriteLine("consumer2 : " + msg);
                Thread.Sleep(1000);//休息1秒

執行效果:

由於 消費者1處理一條訊息要2秒,而消費者2只要1秒,所以消費者2處理得多一些.

方法解釋:

channel.BasicQos(0, 1, false)

引數1: prefetchSize:0

引數2: prefetchCount:1 ,告訴RabbitMQ,不要同時給一個消費者推送多於1條訊息,即一旦有1個訊息還沒有ack(確認),則該消費者將block掉,直到有訊息確認

global:true\false 是否將上面設定應用於channel,簡單點說,就是上面限制是channel級別的還是consumer級別

備註:據說prefetchSize 和global這兩項,rabbitmq沒有實現,暫且不研究.

channel.BasicAck(e.DeliveryTag, false)

引數1 : deliveryTag : e.DeliveryTag,該訊息的標記 ,ulong 型別.
引數2 : multiple:是否批量.true:將一次性確認所有小於 deliveryTag 的訊息.