rabbitmq介紹及rabbitmq在java中基礎使用
阿新 • • 發佈:2018-12-30
RabbitMQ簡介
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業訊息系統。它可以用於大型軟體系統各個模組之間的高效通訊,支援高併發,支援可擴充套件。使用Erlang語言編寫。
RabbitMQ相關術語:
1.Broker:簡單來說就是訊息佇列伺服器實體。
2.Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
3.Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
4.Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
5.Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
6.vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
7.producer:訊息生產者,就是投遞訊息的程式。
8.consumer:訊息消費者,就是接受訊息的程式。
9.channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。
RabbitMQ常用釋出訂閱模式的執行流程:
AMQP模型中,訊息在producer中產生,傳送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將訊息傳送給consumer,訊息從queue到consumer有push和pull兩種方式。 訊息佇列的使用過程大概如下:
1.客戶端連線到訊息佇列伺服器,開啟一個channel。
2.客戶端宣告一個exchange,並設定相關屬性。
3.客戶端宣告一個queue,並設定相關屬性。
4.客戶端使用routing key,在exchange和queue之間建立好繫結關係。
5.客戶端投遞訊息到exchange。
exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。 exchange也有幾個型別,下面會有介紹。
1.hello world!
Work普通模式
1、消費者1和消費者2獲取到的訊息內容是不同的,同一個訊息只能被一個消費者獲取。
2、消費者1和消費者2獲取到的訊息的數量是相同的,一個是奇數一個是偶數。
其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的訊息多才對。
Work的能者多勞模式
需要將上面兩個消費者的channel.basicQos(1);這行程式碼的註釋開啟,再次執行會發現,休眠時間短的消費者執行的任務多
訊息的確認
在以上的程式碼中,已經給出了註釋,如何使用自動確認和手動確認,消費者從佇列中獲取訊息,服務端如何知道訊息已經被消費呢?
模式1:自動確認
只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功訊息,都認為是訊息已經成功消費。
模式2:手動確認
消費者從佇列中獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態。
如果選用自動確認,在消費者拿走訊息執行過程中出現宕機時,訊息可能就會丟失!!
文章開頭有釋出訂閱的流程介紹
注意:訊息傳送到沒有佇列繫結的交換機時,訊息將丟失,因為,交換機沒有儲存訊息的能力,訊息只能存在在佇列中。
Exchange型別
Direct 、Fanout 、Topic 三種類型,RabbitMQ預設有一個exchange,叫default exchange,它用一個空字串表示,它是direct exchange型別。
3.2萬用字元模式
將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”只能匹配一個詞。因此“audit.#”能夠匹配到“audit.irs”和“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業訊息系統。它可以用於大型軟體系統各個模組之間的高效通訊,支援高併發,支援可擴充套件。使用Erlang語言編寫。
RabbitMQ相關術語:
1.Broker:簡單來說就是訊息佇列伺服器實體。
2.Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
3.Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
4.Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來。
5.Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
6.vhost:虛擬主機,一個broker裡可以開設多個vhost,用作不同使用者的許可權分離。
7.producer:訊息生產者,就是投遞訊息的程式。
8.consumer:訊息消費者,就是接受訊息的程式。
9.channel:訊息通道,在客戶端的每個連線裡,可建立多個channel,每個channel代表一個會話任務。
RabbitMQ常用釋出訂閱模式的執行流程:
AMQP模型中,訊息在producer中產生,傳送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將訊息傳送給consumer,訊息從queue到consumer有push和pull兩種方式。 訊息佇列的使用過程大概如下:
1.客戶端連線到訊息佇列伺服器,開啟一個channel。
2.客戶端宣告一個exchange,並設定相關屬性。
3.客戶端宣告一個queue,並設定相關屬性。
4.客戶端使用routing key,在exchange和queue之間建立好繫結關係。
5.客戶端投遞訊息到exchange。
exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。 exchange也有幾個型別,下面會有介紹。
RabbitMQ教程:
引入maven依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
1.hello world!
生產者:
消費者:package com.rabbitmq.test.T_helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.test.util.ConnectionUtil; /** * helloworld * @author lenovo * */ public class Producer { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從連線中建立通道 Channel channel = connection.createChannel(); /* * 宣告(建立)佇列 * 引數1:佇列名稱 * 引數2:為true時server重啟佇列不會消失 * 引數3:佇列是否是獨佔的,如果為true只能被一個connection使用,其他連線建立時會丟擲異常 * 引數4:佇列不再使用時是否自動刪除(沒有連線,並且沒有未處理的訊息) * 引數5:建立佇列時的其他引數 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 訊息內容 String message = "Hello World!"; /* * 向server釋出一條訊息 * 引數1:exchange名字,若為空則使用預設的exchange * 引數2:routing key * 引數3:其他的屬性 * 引數4:訊息體 * RabbitMQ預設有一個exchange,叫default exchange,它用一個空字串表示,它是direct exchange型別, * 任何發往這個exchange的訊息都會被路由到routing key的名字對應的佇列上,如果沒有對應的佇列,則訊息會被丟棄 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [生產者] Sent '" + message + "'"); //關閉通道和連線 channel.close(); connection.close(); } }
package com.rabbitmq.test.T_helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 從連線中建立通道
Channel channel = connection.createChannel();
// 宣告佇列(如果你已經明確的知道有這個佇列,那麼下面這句程式碼可以註釋掉,如果不註釋掉的話,也可以理解為消費者必須監聽一個佇列,如果沒有就建立一個)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
/*
* 監聽佇列
* 引數1:佇列名稱
* 引數2:是否傳送ack包,不傳送ack訊息會持續在服務端儲存,直到收到ack。 可以通過channel.basicAck手動回覆ack
* 引數3:消費者
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消費者] Received '" + message + "'");
}
}
}
2.Work模式Work普通模式
生產者:
package com.rabbitmq.test.T_work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;
/**
* work模式
* @author lenovo
*
*/
public class Producer {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
// 訊息內容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [生產者] Sent '" + message + "'");
//傳送的訊息間隔越來越長
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
消費者1:
package com.rabbitmq.test.T_work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻伺服器只會發一條訊息給消費者(能者多勞模式)
//channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
/*
* 監聽佇列,不自動返回ack包,下面手動返回
* 如果不回覆,訊息不會在伺服器刪除
*/
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消費者1] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 手動返回ack包確認狀態
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//channel.basicReject(); channel.basicNack(); //可以通過這兩個函式拒絕訊息,可以指定訊息在伺服器刪除還是繼續投遞給其他消費者
}
}
}
消費者2:
package com.rabbitmq.test.T_work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一時刻伺服器只會發一條訊息給消費者(能者多勞模式)
//channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成狀態
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消費者2] Received '" + message + "'");
// 休眠1秒
Thread.sleep(1000);
//反饋訊息的消費狀態
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
測試結果:1、消費者1和消費者2獲取到的訊息內容是不同的,同一個訊息只能被一個消費者獲取。
2、消費者1和消費者2獲取到的訊息的數量是相同的,一個是奇數一個是偶數。
其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的訊息多才對。
Work的能者多勞模式
需要將上面兩個消費者的channel.basicQos(1);這行程式碼的註釋開啟,再次執行會發現,休眠時間短的消費者執行的任務多
訊息的確認
在以上的程式碼中,已經給出了註釋,如何使用自動確認和手動確認,消費者從佇列中獲取訊息,服務端如何知道訊息已經被消費呢?
模式1:自動確認
只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功訊息,都認為是訊息已經成功消費。
模式2:手動確認
消費者從佇列中獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態。
如果選用自動確認,在消費者拿走訊息執行過程中出現宕機時,訊息可能就會丟失!!
3.訂閱模式
文章開頭有釋出訂閱的流程介紹
生產者:
package com.rabbitmq.test.T_pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;
/**
* 訂閱模式
* @author lenovo
*
*/
public class Producer {
//交換機的名稱
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/*
* 宣告exchange(交換機)
* 引數1:交換機名稱
* 引數2:交換機型別
* 引數3:交換機永續性,如果為true則伺服器重啟時不會丟失
* 引數4:交換機在不被使用時是否刪除
* 引數5:交換機的其他屬性
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true,true,null);
// 訊息內容
String message = "訂閱訊息";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生產者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1:
package com.rabbitmq.test.T_pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_exchange_1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*
* 繫結佇列到交換機(這個交換機的名稱一定要和上面的生產者交換機名稱相同)
* 引數1:佇列的名稱
* 引數2:交換機的名稱
* 引數3:Routing Key
*
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一時刻伺服器只會發一條訊息給消費者
channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消費者1] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者2:
package com.rabbitmq.test.T_pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_exchange_2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一時刻伺服器只會發一條訊息給消費者
channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消費者2] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
注意:訊息傳送到沒有佇列繫結的交換機時,訊息將丟失,因為,交換機沒有儲存訊息的能力,訊息只能存在在佇列中。
Exchange型別
Direct 、Fanout 、Topic 三種類型,RabbitMQ預設有一個exchange,叫default exchange,它用一個空字串表示,它是direct exchange型別。
下面介紹的路由模式和萬用字元模式都是屬於訂閱模式,只不過加入了Routing Key(路由鍵,文章開頭有介紹)。
3.1路由模式
生產者:
package com.rabbitmq.test.T_routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;
/**
* 路由模式
* @author lenovo
*
*/
public class Producer {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 訊息內容
String message = "這是訊息B";
channel.basicPublish(EXCHANGE_NAME, "B", null, message.getBytes());
System.out.println(" [生產者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1:
package com.rabbitmq.test.T_routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*
* 繫結佇列到交換機
* 引數1:佇列的名稱
* 引數2:交換機的名稱
* 引數3:routingKey
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "A");
// 同一時刻伺服器只會發一條訊息給消費者
channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消費者1] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者2:
package com.rabbitmq.test.T_routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "B");
//如果想讓消費者2同時接受routingKey為A 和為B的訊息,只要在下面在此新增一個Bing就可以了
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "A");
// 同一時刻伺服器只會發一條訊息給消費者
channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消費者2] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
3.2萬用字元模式
將路由鍵和某模式進行匹配。此時佇列需要繫結要一個模式上。符號“#”匹配一個或多個詞,符號“*”只能匹配一個詞。因此“audit.#”能夠匹配到“audit.irs”和“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。
生產者:
package com.rabbitmq.test.T_topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.test.util.ConnectionUtil;
/**
* 通配模式
* @author lenovo
*
*/
public class Producer {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 訊息內容 模擬 有人購物下訂單
String message = "新增訂單:id=101";
channel.basicPublish(EXCHANGE_NAME, "order.insert", null, message.getBytes());
System.out.println(" [生產者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消費者1:
package com.rabbitmq.test.T_topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.#");
// 同一時刻伺服器只會發一條訊息給消費者
channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [財務系統] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消費者2:
package com.rabbitmq.test.T_topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.test.util.ConnectionUtil;
public class Consumer2 {
private final static String QUEUE_NAME = "test_queue_topic_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 獲取到連線以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.insert");
// 同一時刻伺服器只會發一條訊息給消費者
channel.basicQos(1);
// 定義佇列的消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 監聽佇列,手動返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 獲取訊息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [物流系統] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}