1. 程式人生 > 其它 >04 Work Queues

04 Work Queues

Work Queues

工作佇列(又稱任務佇列)的主要思想是避免立即執行資源密集型任務,而不得不等待它完成。相反我們安排任務在之後執行。我們把任務封裝為訊息並將其傳送到佇列。在後臺執行的工作程序將彈出任務並最終執行作業。當有多個工作執行緒時,這些工作執行緒將一起處理這些任務。

輪訓分發訊息

在這個案例中我們會啟動兩個工作執行緒,一個訊息傳送執行緒,我們來看看他們兩個工作執行緒是如何工作的。

抽取工具類

/**
 * 連線工廠建立通道的工具類
 */
public class RabbitMQUtils {
    //得到一個連線的channel
    public static Channel getChannel() throws Exception, TimeoutException {
        //建立連線工廠
        ConnectionFactory factory = new ConnectionFactory();
        //工廠IP 連線RabbitMQ的佇列
        factory.setHost("112.124.22.24");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        return channel;
    }

}

啟動兩個執行緒

//這是一個工作執行緒  相當於消費者
public class Worker01 {

    //佇列名稱
    public static final String QUEUE_NAME = "hello";

    //接收訊息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        //訊息的接受
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println("接收到的訊息"+new String(message.getBody()));
        };

        //訊息接受被取消時  執行下面的內容
        CancelCallback cancelCallback =consumerTag ->{
            System.out.println(consumerTag+"訊息者取消消費介面回撥邏輯");
        };
        //訊息的接受
        /**
         * 消費者消費資訊
         *1.消費哪個佇列
         * 2.消費成功之後是否要自動應答  true:自動應答 false  代表手動應答
         * 3.消費者未成功消費的回撥
         * 4.消費者取消 消費的 回撥
         */
        System.out.println("C2等待接受訊息.......");
        /**
         * 傳送一個訊息
         * 1.傳送到哪個交換機
         * 2.路由的key值是哪個  本次是佇列的名稱
         * 3.其他引數資訊
         *4.傳送訊息的訊息體
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }
}


開啟兩個執行緒 C1 C2

啟動一個傳送執行緒

//生產者傳送大量的訊息
public class Task01 {

    //佇列名稱
    public static final String QUEUE_NAME = "hello";

    //傳送大量訊息
    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();
        //佇列的宣告
        /**
         * 生成一個佇列
         * 1.佇列名稱
         * 2.佇列裡面的訊息是否 持久化(磁碟)  預設情況訊息儲存在記憶體中
         * 3.該佇列是否只供一個消費者進行消費 是否進行消費  是否進行訊息共享,true:只能一個消費者消費   false:可以多個消費者消費
         * 4.是否自動刪除  最後一個消費者斷開連線以後 該佇列是否自動刪除 true 自動刪除  false 不自動刪除
         * 5.其他引數
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //從控制檯接受資訊
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            /**
             * 傳送一個訊息
             * 1.傳送到哪個交換機
             * 2.路由的key值是哪個  本次是佇列的名稱
             * 3.其他引數資訊
             *4.傳送訊息的訊息體
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("訊息傳送完畢"+message);

        }
    }
}

結果展示

通過程式執行發現生產者總共傳送 4 個訊息,消費者 1 和消費者 2 分別分得兩個訊息,並且是按照有序的一個接收一次訊息

不公平分發

RabbitMQ 分發訊息採用的輪訓分發,但是在某種場景下這種策略並不是很好

比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非常快,而另外一個消費者 2處理速度卻很慢,這個時候我們還是採用輪訓分發的化就會到這處理速度快的這個消費者很大一部分時間處於空閒狀態,而處理慢的那個消費者一直在幹活,這種分配方式在這種情況下其實就不太好,但是RabbitMQ 並不知道這種情況它依然很公平的進行分發。

為了避免這種情況,我們可以設定引數 channel.basicQos(1);

在消費者中設定通道

 //設定不公平分發
int prefetchCount = 1;
channel.basicQos(prefetchCount);

意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配給我,我目前只能處理一個任務,然後 rabbitmq 就會把該任務分配給沒有那麼忙的那個空閒消費者,當然如果所有的消費者都沒有完成手上任務,佇列還在不停的新增新任務,佇列有可能就會遇到佇列被撐滿的情況,這個時候就只能新增新的 worker 或者改變其他儲存任務的策略。

預取值

該值定義通道上允許的未確認訊息的最大數量。一旦數量達到配置的數量,RabbitMQ 將停止在通道上傳遞更多訊息,除非至少有一個未處理的訊息被確認

例如,假設在通道上有未確認的訊息 5、6、7,8,並且通道的預取計數設定為 4,此時 RabbitMQ 將不會在該通道上再傳遞任何訊息,除非至少有一個未應答的訊息被 ack。

設定了預取值之後,分兩條給C1 5條給C2不會給C1處理

 //預取值是5
int prefetchCount = 5;
channel.basicQos(prefetchCount);