1. 程式人生 > 實用技巧 >RabbitMQ——工作佇列模式

RabbitMQ——工作佇列模式

1.模式說明

應用場景:對於 任務過重或任務較多情況使用工作佇列可以提高任務處理的速度
只有3個角色:
P:生產者,也就是要傳送訊息的程式
C:消費者:訊息的接受者,會一直等待訊息到來。
queue:訊息佇列,圖中紅色部分

2.應用舉例

Producer:

 1 /**
 2  * 傳送訊息
 3  */
 4 public class Producer_WorkQueues {
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6 
 7         //1.建立連線工廠
8 ConnectionFactory factory = new ConnectionFactory(); 9 //2. 設定引數 10 factory.setHost("172.16.98.133");//ip 預設值 localhost 11 factory.setPort(5672); //埠 預設值 5672 12 factory.setVirtualHost("/it");//虛擬機器 預設值/ 13 factory.setUsername("jingdong");//使用者名稱 預設 guest 14
factory.setPassword("jingdong");//密碼 預設值 guest 15 //3. 建立連線 Connection 16 Connection connection = factory.newConnection(); 17 //4. 建立Channel 18 Channel channel = connection.createChannel(); 19 //5. 建立佇列Queue 20 /* 21 queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
22 引數: 23 1. queue:佇列名稱 24 2. durable:是否持久化,當mq重啟之後,還在 25 3. exclusive: 26 * 是否獨佔。只能有一個消費者監聽這佇列 27 * 當Connection關閉時,是否刪除佇列 28 * 29 4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉 30 5. arguments:引數。 31 32 */ 33 //如果沒有一個名字叫hello_world的佇列,則會建立該佇列,如果有則不會建立 34 channel.queueDeclare("work_queues",true,false,false,null); 35 /* 36 basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 37 引數: 38 1. exchange:交換機名稱。簡單模式下交換機會使用預設的 "" 39 2. routingKey:路由名稱 40 3. props:配置資訊 41 4. body:傳送訊息資料 42 43 */ 44 for (int i = 1; i <= 10; i++) { 45 String body = i+"hello rabbitmq~~~"; 46 47 //6. 傳送訊息 48 channel.basicPublish("","work_queues",null,body.getBytes()); 49 } 50 51 //7.釋放資源 52 channel.close(); 53 connection.close(); 54 55 } 56 }
View Code

Customer:

 1 public class Consumer_WorkQueues1 {
 2     public static void main(String[] args) throws IOException, TimeoutException {
 3 
 4         //1.建立連線工廠
 5         ConnectionFactory factory = new ConnectionFactory();
 6         //2. 設定引數
 7         factory.setHost("172.16.98.133");//ip  預設值 localhost
 8         factory.setPort(5672); //埠  預設值 5672
 9         factory.setVirtualHost("/it");//虛擬機器 預設值/
10         factory.setUsername("jingdong");//使用者名稱 預設 guest
11         factory.setPassword("jingdong");//密碼 預設值 guest
12         //3. 建立連線 Connection
13         Connection connection = factory.newConnection();
14         //4. 建立Channel
15         Channel channel = connection.createChannel();
16         //5. 建立佇列Queue
17         /*
18         queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
19         引數:
20             1. queue:佇列名稱
21             2. durable:是否持久化,當mq重啟之後,還在
22             3. exclusive:
23                 * 是否獨佔。只能有一個消費者監聽這佇列
24                 * 當Connection關閉時,是否刪除佇列
25                 *
26             4. autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉
27             5. arguments:引數。
28 
29          */
30         //如果沒有一個名字叫hello_world的佇列,則會建立該佇列,如果有則不會建立
31         channel.queueDeclare("work_queues",true,false,false,null);
32 
33         /*
34         basicConsume(String queue, boolean autoAck, Consumer callback)
35         引數:
36             1. queue:佇列名稱
37             2. autoAck:是否自動確認
38             3. callback:回撥物件
39 
40          */
41         // 接收訊息
42         Consumer consumer = new DefaultConsumer(channel){
43             /*
44                 回撥方法,當收到訊息後,會自動執行該方法
45 
46                 1. consumerTag:標識
47                 2. envelope:獲取一些資訊,交換機,路由key...
48                 3. properties:配置資訊
49                 4. body:資料
50 
51              */
52             @Override
53             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
54               /*  System.out.println("consumerTag:"+consumerTag);
55                 System.out.println("Exchange:"+envelope.getExchange());
56                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
57                 System.out.println("properties:"+properties);*/
58                 System.out.println("body:"+new String(body));
59             }
60         };
61         channel.basicConsume("work_queues",true,consumer);
62 
63 
64         //關閉資源?不要
65 
66     }
67 }
View Code