RabbitMQ-Work模式
阿新 • • 發佈:2018-12-12
一、簡介
在《RabbitMQ-簡單佇列》中,只有一個生產者一個消費者。今天我們一起學習Work模式,即一個生產者,多個消費者,如下圖所示:
需要注意的是:
(1)生產者的訊息是傳送到一個佇列裡,所以即使有兩個消費者,一個訊息只能被一個消費者消費。
(2)Work模式中可以分為兩種模式:一兩個消費者平均消費佇列中的訊息,即使它們的消費能力是不一樣的;二能者多勞模式,消費能力強的消費者會獲取更多的訊息。
二、編碼實現
2.1、生產者
向佇列傳送50條訊息,生產者每生產一條訊息後都會休眠一段時間。
public class Producer { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { // 訊息內容 String message = "訊息:" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 休眠 Thread.sleep(i * 10); } channel.close(); connection.close(); } }
2.2、消費者1
public class Consumer1 { public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QueueUtil.QUEUE_NAME_WORK, false, false, false, null); // 定義佇列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽佇列,手動返回完成 channel.basicConsume(QueueUtil.QUEUE_NAME_WORK, false, consumer); // 獲取訊息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Consumer1 Received:" + message); // 休眠 Thread.sleep(10); // 返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
2.3、消費者2
public class Cunsumer2 { public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QueueUtil.QUEUE_NAME_WORK, false, false, false, null); // 定義佇列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽佇列,手動返回完成狀態 channel.basicConsume(QueueUtil.QUEUE_NAME_WORK, false, consumer); // 獲取訊息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Consumer2 Received:" + message); // 休眠1秒 Thread.sleep(1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
2.4、測試
(1)啟動消費者1和消費者2。
(2)啟動生產者,佇列work_queue中初始有50條訊息待消費,隨後被消費者1和消費者2消費。
(3)觀察控制檯。
- 消費者1和消費者2獲取到的訊息內容是不同的,同一個訊息只能被一個消費者獲取。
- 消費者1和消費者2獲取到的訊息的數量是相同的,一個是奇數一個是偶數。
大家看到這,可能會問,既然消費者1的能力大於消費者2(消費者1的休眠時間更短),那消費者1的消費數量是不是應該更多些呢?從一定角度上來說,應該是這樣的。這也是我們接下來要描述的“能者多勞模式”,如下:
public class Consumer1 {
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QueueUtil.QUEUE_NAME_WORK, false, false, false, null);
// 同一時刻伺服器只會發一條訊息給消費者(上一條訊息消費完了,才會消費下一條)
channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成
channel.basicConsume(QueueUtil.QUEUE_NAME_WORK, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Consumer1 Received:" + message);
// 休眠
Thread.sleep(10);
// 返回確認狀態
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
不知道大家有沒有注意到,我們在消費者1這裡加了channel.basicQos(1)這行程式碼,用於表示同一時刻伺服器只會傳送一條訊息給消費者。同理,我們在消費者2也加上這行程式碼。然後重新進行測試: