1. 程式人生 > 其它 >RabbitMq-Work queues工作佇列模式(二)

RabbitMq-Work queues工作佇列模式(二)

1、Work queues 工作佇列模式概念:

Work Queues 與入門程式的 簡單模式 相比,多了一個或一些消費端,多個消費端共同消費同一個佇列中的訊息。

工作佇列模式:在同一個佇列中可以有多個消費者,消費者之間對於訊息的接收是競爭關係。

Work Queues 與入門程式的 簡單模式 的程式碼是幾乎一樣的;可以完全複製,並複製多一個消費者進行多個消費者同時消費訊息的測試。

2、Work queues 工作佇列模式 程式碼步驟:

1)生產者:傳送30個訊息

package com.study.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

//work模式 傳送訊息

public class Producer {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
// 1. 建立連線工廠(設定RabbitMQ的連線引數);
        ConnectionFactory connectionFactory= new ConnectionFactory();
        //主機;預設localhost
        connectionFactory.setHost("localhost");
        //連線埠;預設5672
        connectionFactory.setPort(5672);
        //虛擬主機;預設/
        connectionFactory.setVirtualHost("/");
        //使用者名稱;預設guest
        connectionFactory.setUsername("guest");
        //密碼;預設guest
        connectionFactory.setPassword("guest");
//   2. 建立連線
        Connection connection = connectionFactory.newConnection();
//   3. 建立頻道;
        Channel channel = connection.createChannel();
//   4. 宣告佇列;
        /**
         * 引數1:佇列名稱
         * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
         * 引數3:是否獨佔本連線
         * 引數4:是否在不使用的時候佇列自動刪除
         * 引數5:其它引數
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        for(int i=1;i<=30;i++){
            //5. 傳送訊息(改動);
            String message = "你好!work 模式---"+i;
            /**
             * 引數1:交換機名稱;如果沒有則指定空字串(表示使用預設的交換機)
             * 引數2:路由key,簡單模式中可以使用佇列名稱
             * 引數3:訊息其它屬性
             * 引數4:訊息內容
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("已傳送訊息:" + message);

        }
//        6. 關閉資源
        channel.close();
        connection.close();
    }

}

2)消費者:建立兩個消費者監聽同一個佇列,檢視兩個消費者的接收訊息是否存在重複。

package com.study.rabbitmq.work;
import com.rabbitmq.client.*;
import com.study.rabbitmq.util.ConnectionUtil;

import java.io.IOException;


/**
 * work模式;消費者接收訊息
 */
public class Consumer1 {
    public static void main(String[] args) throws Exception {
//        1. 建立連線工廠;
//        2. 建立連線;(抽取一個獲取連線的工具類)
        Connection connection = ConnectionUtil.getConnection();
//        3. 建立頻道;
        final Channel channel = connection.createChannel();
//        4. 宣告佇列;
        /**
         * 引數1:佇列名稱
         * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上)
         * 引數3:是否獨佔本連線
         * 引數4:是否在不使用的時候佇列自動刪除
         * 引數5:其它引數
         */
        channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
        //每次可以預取多少個訊息
        channel.basicQos(1);
//        5. 建立消費者(接收訊息並處理訊息);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("路由key為:" + envelope.getRoutingKey());
                //交換機
                System.out.println("交換機為:" + envelope.getExchange());
                //訊息id
                System.out.println("訊息id為:" + envelope.getDeliveryTag());
                //接收到的訊息
                System.out.println("消費者1-----接收到的訊息為:" + new String(body, "utf-8"));
                try {
                    Thread.sleep(1000);
                    //確認訊息
                    /*
                    引數1:訊息id
                    引數2:是否確認,false表示只有當前這條訊息被處理
                     */
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
//        6. 監聽佇列   (需要持續監聽佇列訊息,所以不要關閉資源)
        /**
         * 引數1:佇列名
         * 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除;
         * 如果設定為false則需要手動確認
         * 引數3:消費者
         */
        channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);
        //不關閉資源,應該一直監聽訊息
        // channel.close();
        // connection.close();


    }

}
  1. 先啟動消費者,再啟動生產者 結果:發現一個訊息只能被一個消費者接收,其它消費者是不能接收到同一條訊息的

3、工作佇列模式小結

工作佇列模式:一個訊息只能被一個消費者接收,其它消費者是不能接收到同一條訊息的。

應用場景:對於 任務過重或任務較多情況使用工作佇列,可以提高任務處理的速度

新增對同一個佇列的消費者來提高任務處理能力。