RabbitMQ公平佇列原理實現
阿新 • • 發佈:2018-11-10
目前訊息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當訊息到達佇列進行轉發訊息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。
為了解決這樣的問題,我們可以使用basicQos方法,傳遞引數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條訊息。
換句話說,只有在消費者空閒的時候會發送下一條資訊。排程分發訊息的方式,也就是告訴RabbitMQ每次只給消費者處理一條訊息,也就是等待消費者處理完畢並自己對剛剛處理的訊息進行確認之後,才傳送下一條訊息,防止消費者太過於忙碌,也防止它太過去清閒。
通過 設定channel.basicQos(1);
伺服器能力不同,能者多勞。 均攤模式的話,都處理相同數量的
訊息佇列 發出去的訊息被消費完了 然後收到 ack包 才可以繼續發給他
公平佇列原理:佇列伺服器向消費者傳送訊息的時候,消費者採用手動應答模式,佇列伺服器必須要收到消費者傳送ack結果通知,才會傳送下一個訊息。(快的處理的多,消費的多)
producer:
package com.toov5.Producer; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection; import com.toov5.utils.MQConnectionUtils; public class Producer { // 佇列名稱 private static final String UEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 建立新的連線 Connection connection = MQConnectionUtils.newConnection();// 建立Channel Channel channel = connection.createChannel(); // 建立佇列 channel.queueDeclare(UEUE_NAME, false, false, false, null); channel.basicQos(1); // 保證 取一個消費 for (int i = 0; i < 10; i++) { // 建立message String msg = "toov5_message"; System.out.println("生產者投遞訊息" + msg + i); // 生產者傳送訊息 channel.basicPublish("", UEUE_NAME, null, msg.getBytes()); } // 關閉通道和連線 channel.close(); connection.close(); } }
Consumer1
package com.toov5.Consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.toov5.utils.MQConnectionUtils; public class Consumer1 { //佇列名稱 private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消費者啟動..........1"); //建立新的連線 Connection connection = MQConnectionUtils.newConnection(); //建立Channel final Channel channel = connection.createChannel(); // 消費者關聯佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) { //監聽獲取訊息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg =new String(body,"UTF-8"); System.out.println("消費者獲取生產者訊息:"+msg); try { //模擬應答等待時間 Thread.sleep(1000); } catch (Exception e) { }finally { channel.basicAck(envelope.getDeliveryTag(), false); //手動應答 告訴訊息佇列伺服器 消費成功 } } }; //牽手模式設定 預設自動應答模式 true:自動應答模式 channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手動應答 } }
Consumer2
package com.toov5.Consumer; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.toov5.utils.MQConnectionUtils; public class Consumer2 { //佇列名稱 private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消費者啟動..........2"); //建立新的連線 Connection connection = MQConnectionUtils.newConnection(); //建立Channel final Channel channel = connection.createChannel(); // 消費者關聯佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) { //監聽獲取訊息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg =new String(body,"UTF-8"); System.out.println("消費者獲取生產者訊息:"+msg); try { //模擬應答等待時間 Thread.sleep(300); } catch (Exception e) { }finally { channel.basicAck(envelope.getDeliveryTag(), false); //手動應答 告訴訊息佇列伺服器 消費成功 } } }; //牽手模式設定 預設自動應答模式 true:自動應答模式 channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);// fanse手動應答 } }
執行結果:
睡眠少的(執行快的) 指定的多
注意 每個消費者 必須要應答 一下! 佇列伺服器沒有收到應答 就不會發送下一個給消費者~