RabbitMQ——工作佇列模式
阿新 • • 發佈:2020-08-16
1.模式說明
應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度
只有3個角色:
P:生產者,也就是要傳送訊息的程式
C:消費者:訊息的接受者,會一直等待訊息到來。
queue:訊息佇列,圖中紅色部分
2.應用舉例
Producer:
1 /** 2 * 傳送訊息 3 */ 4 public class Producer_WorkQueues { 5 public static void main(String[] args) throws IOException, TimeoutException { 6 7 //1.建立連線工廠View Code8 ConnectionFactory factory = new ConnectionFactory(); 9 //2. 設定引數 10 factory.setHost("172.16.98.133");//ip 預設值 localhost 11 factory.setPort(5672); //埠 預設值 5672 12 factory.setVirtualHost("/it");//虛擬機器 預設值/ 13 factory.setUsername("jingdong");//使用者名稱 預設 guest 14factory.setPassword("jingdong");//密碼 預設值 guest 15 //3. 建立連線 Connection 16 Connection connection = factory.newConnection(); 17 //4. 建立Channel 18 Channel channel = connection.createChannel(); 19 //5. 建立佇列Queue 20 /* 21 queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)22 引數: 23 1. queue:佇列名稱 24 2. durable:是否持久化,當mq重啟之後,還在 25 3. exclusive: 26 * 是否獨佔。只能有一個消費者監聽這佇列 27 * 當Connection關閉時,是否刪除佇列 28 * 29 4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 30 5. arguments:引數。 31 32 */ 33 //如果沒有一個名字叫hello_world的佇列,則會建立該佇列,如果有則不會建立 34 channel.queueDeclare("work_queues",true,false,false,null); 35 /* 36 basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 37 引數: 38 1. exchange:交換機名稱。簡單模式下交換機會使用預設的 "" 39 2. routingKey:路由名稱 40 3. props:配置資訊 41 4. body:傳送訊息資料 42 43 */ 44 for (int i = 1; i <= 10; i++) { 45 String body = i+"hello rabbitmq~~~"; 46 47 //6. 傳送訊息 48 channel.basicPublish("","work_queues",null,body.getBytes()); 49 } 50 51 //7.釋放資源 52 channel.close(); 53 connection.close(); 54 55 } 56 }
Customer:
1 public class Consumer_WorkQueues1 { 2 public static void main(String[] args) throws IOException, TimeoutException { 3 4 //1.建立連線工廠 5 ConnectionFactory factory = new ConnectionFactory(); 6 //2. 設定引數 7 factory.setHost("172.16.98.133");//ip 預設值 localhost 8 factory.setPort(5672); //埠 預設值 5672 9 factory.setVirtualHost("/it");//虛擬機器 預設值/ 10 factory.setUsername("jingdong");//使用者名稱 預設 guest 11 factory.setPassword("jingdong");//密碼 預設值 guest 12 //3. 建立連線 Connection 13 Connection connection = factory.newConnection(); 14 //4. 建立Channel 15 Channel channel = connection.createChannel(); 16 //5. 建立佇列Queue 17 /* 18 queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 19 引數: 20 1. queue:佇列名稱 21 2. durable:是否持久化,當mq重啟之後,還在 22 3. exclusive: 23 * 是否獨佔。只能有一個消費者監聽這佇列 24 * 當Connection關閉時,是否刪除佇列 25 * 26 4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 27 5. arguments:引數。 28 29 */ 30 //如果沒有一個名字叫hello_world的佇列,則會建立該佇列,如果有則不會建立 31 channel.queueDeclare("work_queues",true,false,false,null); 32 33 /* 34 basicConsume(String queue, boolean autoAck, Consumer callback) 35 引數: 36 1. queue:佇列名稱 37 2. autoAck:是否自動確認 38 3. callback:回撥物件 39 40 */ 41 // 接收訊息 42 Consumer consumer = new DefaultConsumer(channel){ 43 /* 44 回撥方法,當收到訊息後,會自動執行該方法 45 46 1. consumerTag:標識 47 2. envelope:獲取一些資訊,交換機,路由key... 48 3. properties:配置資訊 49 4. body:資料 50 51 */ 52 @Override 53 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 54 /* System.out.println("consumerTag:"+consumerTag); 55 System.out.println("Exchange:"+envelope.getExchange()); 56 System.out.println("RoutingKey:"+envelope.getRoutingKey()); 57 System.out.println("properties:"+properties);*/ 58 System.out.println("body:"+new String(body)); 59 } 60 }; 61 channel.basicConsume("work_queues",true,consumer); 62 63 64 //關閉資源?不要 65 66 } 67 }View Code