RabbitMq-Work queues工作佇列模式(二)
阿新 • • 發佈:2021-10-18
1、Work queues 工作佇列模式概念:
Work Queues 與入門程式的 簡單模式 相比,多了一個或一些消費端,多個消費端共同消費同一個佇列中的訊息。
工作佇列模式:在同一個佇列中可以有多個消費者,消費者之間對於訊息的接收是競爭關係。
Work Queues 與入門程式的 簡單模式 的程式碼是幾乎一樣的;可以完全複製,並複製多一個消費者進行多個消費者同時消費訊息的測試。
2、Work queues 工作佇列模式 程式碼步驟:
1)生產者:傳送30個訊息
package com.study.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //work模式 傳送訊息 public class Producer { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 1. 建立連線工廠(設定RabbitMQ的連線引數); ConnectionFactory connectionFactory= new ConnectionFactory(); //主機;預設localhost connectionFactory.setHost("localhost"); //連線埠;預設5672 connectionFactory.setPort(5672); //虛擬主機;預設/ connectionFactory.setVirtualHost("/"); //使用者名稱;預設guest connectionFactory.setUsername("guest"); //密碼;預設guest connectionFactory.setPassword("guest"); // 2. 建立連線 Connection connection = connectionFactory.newConnection(); // 3. 建立頻道; Channel channel = connection.createChannel(); // 4. 宣告佇列; /** * 引數1:佇列名稱 * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上) * 引數3:是否獨佔本連線 * 引數4:是否在不使用的時候佇列自動刪除 * 引數5:其它引數 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); for(int i=1;i<=30;i++){ //5. 傳送訊息(改動); String message = "你好!work 模式---"+i; /** * 引數1:交換機名稱;如果沒有則指定空字串(表示使用預設的交換機) * 引數2:路由key,簡單模式中可以使用佇列名稱 * 引數3:訊息其它屬性 * 引數4:訊息內容 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("已傳送訊息:" + message); } // 6. 關閉資源 channel.close(); connection.close(); } }
2)消費者:建立兩個消費者監聽同一個佇列,檢視兩個消費者的接收訊息是否存在重複。
package com.study.rabbitmq.work; import com.rabbitmq.client.*; import com.study.rabbitmq.util.ConnectionUtil; import java.io.IOException; /** * work模式;消費者接收訊息 */ public class Consumer1 { public static void main(String[] args) throws Exception { // 1. 建立連線工廠; // 2. 建立連線;(抽取一個獲取連線的工具類) Connection connection = ConnectionUtil.getConnection(); // 3. 建立頻道; final Channel channel = connection.createChannel(); // 4. 宣告佇列; /** * 引數1:佇列名稱 * 引數2:是否定義持久化佇列(訊息會持久化儲存在伺服器上) * 引數3:是否獨佔本連線 * 引數4:是否在不使用的時候佇列自動刪除 * 引數5:其它引數 */ channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); //每次可以預取多少個訊息 channel.basicQos(1); // 5. 建立消費者(接收訊息並處理訊息); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由key System.out.println("路由key為:" + envelope.getRoutingKey()); //交換機 System.out.println("交換機為:" + envelope.getExchange()); //訊息id System.out.println("訊息id為:" + envelope.getDeliveryTag()); //接收到的訊息 System.out.println("消費者1-----接收到的訊息為:" + new String(body, "utf-8")); try { Thread.sleep(1000); //確認訊息 /* 引數1:訊息id 引數2:是否確認,false表示只有當前這條訊息被處理 */ channel.basicAck(envelope.getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 6. 監聽佇列 (需要持續監聽佇列訊息,所以不要關閉資源) /** * 引數1:佇列名 * 引數2:是否要自動確認;設定為true表示訊息接收到自動向MQ回覆接收到了,MQ則會將訊息從佇列中刪除; * 如果設定為false則需要手動確認 * 引數3:消費者 */ channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer); //不關閉資源,應該一直監聽訊息 // channel.close(); // connection.close(); } }
- 先啟動消費者,再啟動生產者 結果:發現一個訊息只能被一個消費者接收,其它消費者是不能接收到同一條訊息的
3、工作佇列模式小結:
工作佇列模式:一個訊息只能被一個消費者接收,其它消費者是不能接收到同一條訊息的。
應用場景:對於 任務過重或任務較多情況使用工作佇列,可以提高任務處理的速度
新增對同一個佇列的消費者來提高任務處理能力。