1. 程式人生 > >RabbitMQ公平佇列原理實現

RabbitMQ公平佇列原理實現

 目前訊息轉發機制是平均分配,這樣就會出現倆個消費者,奇數的任務很耗時,偶數的任何工作量很小,造成的原因就是近當訊息到達佇列進行轉發訊息。並不在乎有多少任務消費者並未傳遞一個應答給RabbitMQ。僅僅盲目轉發所有的奇數給一個消費者,偶數給另一個消費者。

為了解決這樣的問題,我們可以使用basicQos方法,傳遞引數為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條訊息。
換句話說,只有在消費者空閒的時候會發送下一條資訊。排程分發訊息的方式,也就是告訴RabbitMQ每次只給消費者處理一條訊息,也就是等待消費者處理完畢並自己對剛剛處理的訊息進行確認之後,才傳送下一條訊息,防止消費者太過於忙碌,也防止它太過去清閒。
通過 設定channel.basicQos(1);

  伺服器能力不同,能者多勞。 均攤模式的話,都處理相同數量的

 訊息佇列 發出去的訊息被消費完了 然後收到 ack包 才可以繼續發給他

 公平佇列原理:佇列伺服器向消費者傳送訊息的時候,消費者採用手動應答模式,佇列伺服器必須要收到消費者傳送ack結果通知,才會傳送下一個訊息。(快的處理的多,消費的多)

producer:

package com.toov5.Producer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.toov5.utils.MQConnectionUtils; public class Producer { // 佇列名稱 private static final String UEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 建立新的連線 Connection connection = MQConnectionUtils.newConnection();
// 建立Channel Channel channel = connection.createChannel(); // 建立佇列 channel.queueDeclare(UEUE_NAME, false, false, false, null); channel.basicQos(1); // 保證 取一個消費 for (int i = 0; i < 10; i++) { // 建立message String msg = "toov5_message"; System.out.println("生產者投遞訊息" + msg + i); // 生產者傳送訊息 channel.basicPublish("", UEUE_NAME, null, msg.getBytes()); } // 關閉通道和連線 channel.close(); connection.close(); } }

 

Consumer1

package com.toov5.Consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.toov5.utils.MQConnectionUtils;

public class Consumer1 {
  
     //佇列名稱
        private static final String QUEUE_NAME = "test_queue";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("消費者啟動..........1");
            //建立新的連線
        Connection connection = MQConnectionUtils.newConnection();
           //建立Channel
            final Channel channel = connection.createChannel();
            // 消費者關聯佇列
             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
             channel.basicQos(1);
              DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
                  //監聽獲取訊息
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg =new String(body,"UTF-8");
                        System.out.println("消費者獲取生產者訊息:"+msg);
                        try {
                            //模擬應答等待時間
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            
                        }finally {
                           channel.basicAck(envelope.getDeliveryTag(), false);  //手動應答 告訴訊息佇列伺服器 消費成功
                        }
                    }
              };
            //牽手模式設定  預設自動應答模式  true:自動應答模式  
              channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);//    fanse手動應答          

        }
}

 

Consumer2

package com.toov5.Consumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.toov5.utils.MQConnectionUtils;

public class Consumer2 {
  
     //佇列名稱
        private static final String QUEUE_NAME = "test_queue";
        
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("消費者啟動..........2");
            //建立新的連線
        Connection connection = MQConnectionUtils.newConnection();
           //建立Channel
            final Channel channel = connection.createChannel();
            // 消費者關聯佇列
             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
              channel.basicQos(1);
              DefaultConsumer defaultConsumerr = new DefaultConsumer(channel) {
                  //監聽獲取訊息
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg =new String(body,"UTF-8");
                        System.out.println("消費者獲取生產者訊息:"+msg);
                        try {
                            //模擬應答等待時間
                            Thread.sleep(300);
                        } catch (Exception e) {
                            
                        }finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);  //手動應答 告訴訊息佇列伺服器 消費成功
                        }
                    }
              };
            //牽手模式設定  預設自動應答模式  true:自動應答模式  
              channel.basicConsume(QUEUE_NAME, false, defaultConsumerr);//    fanse手動應答          
        }
}

執行結果:

睡眠少的(執行快的) 指定的多

 注意 每個消費者 必須要應答 一下! 佇列伺服器沒有收到應答 就不會發送下一個給消費者~