1. 程式人生 > >RabbitMQ的輪詢模式和公平分發

RabbitMQ的輪詢模式和公平分發

### 一、常用的訊息模式 我們在工作的使用中,經常會遇到多個消費者監聽同一個佇列的情況,模型如下圖所示: ![](https://img2020.cnblogs.com/blog/653404/202004/653404-20200407191754556-7846247.png) 當有多個消費者時,我們的訊息會被哪個消費者消費呢,我們又該如何均衡消費者消費資訊的多少呢; 主要有兩種模式: 1、輪詢模式的分發:一個消費者一條,按均分配; 2、公平分發:根據消費者的消費能力進行公平分發,處理快的處理的多,處理慢的處理的少;按勞分配; ### 二、輪詢模式(Round-Robin) 該模式接收訊息是當有多個消費者接入時,訊息的分配模式是一個消費者分配一條,直至訊息消費完成; #### 2.1 生產者發訊息到佇列 ``` public static void SendRoundRobinMessage() { try { var conn = GetConnection(); var channel = conn.CreateModel(); channel.QueueDeclare(QUEUE_NAME, false, false, false, null); for(var i = 0; i < 50; i++) { var body = Encoding.UTF8.GetBytes(i.ToString()); channel.BasicPublish("", QUEUE_NAME, null, body); } Console.WriteLine("訊息傳送完成!"); channel.Close(); conn.Close(); } catch (Exception ex) { throw ex; } } ``` #### 2.2 消費者1程式碼 消費者1每處理完一次訊息,執行緒休息1秒; ``` /// /// 輪詢分發消費者1 /// static void SimpleConsumer1() { //new rabbitMqTest.RabbitMQ.MQUtils().GetMessage(); //建立連線工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "admin",//使用者名稱 Password = "admin",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立連線 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到訊息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Simple Consumer1 收到訊息: {message},時間{DateTime.Now}"); Thread.Sleep(1000); //確認該訊息已被消費 //channel.BasicAck(ea.DeliveryTag, false); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume("queue_test", true, consumer); Console.WriteLine("Simple Consumer1 消費者已啟動"); Console.ReadKey(); channel.Dispose(); connection.Close(); } ``` 消費者接收訊息如圖: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200509150127506-1996157550.png) #### 2.3 消費者2程式碼 消費者2每處理完一次訊息,執行緒休息3秒; ``` /// /// 輪詢分發消費者2 ///
static void SimpleConsumer2() { //建立連線工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//使用者名稱 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立連線 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到訊息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Simple Consumer2 收到訊息: {message},時間{DateTime.Now}"); Thread.Sleep(3000); //確認該訊息已被消費 //channel.BasicAck(ea.DeliveryTag, false); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume("queue_test", true, consumer); Console.WriteLine("Simple 2 消費者已啟動"); Console.ReadKey(); channel.Dispose(); connection.Close(); } ``` 消費者接收訊息如圖: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200509150203120-1711536790.png) #### 2.4 輪詢分發小結 消費者1和2的訊息處理能力不同,但是最後處理的訊息條數相同,是“按均分配”。 ### 三、公平分發(Fair Dispatch) 由於訊息接收者處理訊息的能力不同,存在處理快慢的問題,我們就需要能者多勞,處理快的多處理,處理慢的少處理; #### 3.1 生產者發訊息到佇列 程式碼如下: ``` public static void SendQosMessage() { try { var conn = GetConnection(); var channel = conn.CreateModel(); channel.QueueDeclare(QUEUE_NAME, false, false, false, null); channel.BasicQos(0,1,false); for (var i = 0; i < 50; i++) { var body = Encoding.UTF8.GetBytes(i.ToString()); channel.BasicPublish("", QUEUE_NAME, null, body); } Console.WriteLine("訊息傳送完成!"); channel.Close(); conn.Close(); } catch (Exception ex) { throw ex; } } ``` #### 3.2 消費者1程式碼如下 為了模擬處理訊息的時長,每處理完一條訊息讓執行緒休息1s ``` static void SimpleConsumer1() { //new rabbitMqTest.RabbitMQ.MQUtils().GetMessage(); //建立連線工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "admin",//使用者名稱 Password = "admin",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立連線 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); channel.BasicQos(0, 1, false); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到訊息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Simple Consumer1 收到訊息: {message},時間{DateTime.Now}"); Thread.Sleep(1000); //確認該訊息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume("queue_test", false, consumer); Console.WriteLine("Simple 1 消費者已啟動"); Console.ReadKey(); channel.Dispose(); connection.Close(); } ``` 處理的訊息結果如圖: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200509121430000-1976859256.png) #### 3.3 消費者2處理訊息較消費者1慢,程式碼如下 為了模擬處理訊息的時長,每處理完一條訊息讓執行緒休息3s ``` static void SimpleConsumer2() { //new rabbitMqTest.RabbitMQ.MQUtils().GetMessage(); //建立連線工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "admin",//使用者名稱 Password = "admin",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //建立連線 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); channel.BasicQos(0, 1, false); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到訊息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Simple Consumer2 收到訊息: {message},時間{DateTime.Now}"); Thread.Sleep(3000); //確認該訊息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume("queue_test", false, consumer); Console.WriteLine("Simple 2 消費者已啟動"); Console.ReadKey(); channel.Dispose(); connection.Close(); } ``` 處理訊息的結果如圖: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200509121317011-293289691.png) #### 3.4 處理訊息的結果 從結果可以看到,消費者1在相同時間內,處理了更多的訊息;以上程式碼我們實現了公平分發模式; #### 3.5 注意點 (1)消費者一次接收一條訊息,程式碼`channel.BasicQos(0, 1, false);` (2) 公平分發需要消費者開啟手動應答,關閉自動應答 關閉自動應答程式碼`channel.BasicConsume("queue_test", false, consumer);` 消費者開啟手動應答程式碼:`channel.BasicAck(ea.DeliveryTag, false);` ### 四、小結 (1)當佇列裡訊息較多時,我們通常會開啟多個消費者處理訊息;公平分發和輪詢分發都是我們經常使用的模式。 (2)輪詢分發的主要思想是“按均分配”,不考慮消費者的處理能力,所有消費者均分;這種情況下,處理能力弱的伺服器,一直都在處理訊息,而處理能力強的伺服器,在處理完訊息後,處於空閒狀態; (3) 公平分發的主要思想是"能者多勞",按需分配,能力強的乾的多。 參考文件: https://www.rabbitmq.com/tutorials/tutorial-one-dotn