RabbitMQ的使用(可以實現商品資料的同步)
1.1. 安裝完成後操作
1、系統服務中有RabbitMQ服務,停止、啟動、重啟
1、 開啟命令列工具
如果找不到命令列工具:
2、 啟用管理外掛
3、 檢視管理頁面
4、 通過預設賬戶 guest/guest 登入
如果能夠登入,說明安裝成功。
1.2. 新增使用者
1.3. 管理介面中的功能
- 5種佇列
1.1. 簡單佇列
1.1.1. 圖示
P:訊息的生產者
C:訊息的消費者
紅色:佇列
生產者將訊息傳送到佇列,消費者從佇列中獲取訊息。
1.1.2. 匯入RabbitMQ的客戶端依賴
1.1.3. 獲取MQ的連線
1.1.4. 生產者傳送訊息到佇列
1.1.5. 管理工具中檢視訊息
點選上面的佇列名稱,查詢具體的佇列中的資訊:
1.1.1. 消費者從佇列中獲取訊息
1.1. Work模式
1.1.1. 圖示
一個生產者、2個消費者。
一個訊息只能被一個消費者獲取。
1.1.2. 消費者1
public class Recv {
private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻伺服器只會發一條訊息給消費者 //channel.basicQos(1); // 定義佇列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽佇列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取訊息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); //休眠 Thread.sleep(10); // 返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
}
1.1.3. 消費者2
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連線以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 宣告佇列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻伺服器只會發一條訊息給消費者 //channel.basicQos(1); // 定義佇列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽佇列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取訊息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); // 休眠1秒 Thread.sleep(1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
}
1.1.4. 生產者
向佇列中傳送50條訊息。
1.1.5. 測試
測試結果:
1、 消費者1和消費者2獲取到的訊息內容是不同的,同一個訊息只能被一個消費者獲取。
2、 消費者1和消費者2獲取到的訊息的數量是相同的,一個是奇數一個是偶數。
其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的訊息多才對。
1.1. Work模式的“能者多勞”
測試:
消費者1比消費者2獲取的訊息更多。
1.1. 訊息的確認模式
消費者從佇列中獲取訊息,服務端如何知道訊息已經被消費呢?
模式1:自動確認
只要訊息從佇列中獲取,無論消費者獲取到訊息後是否成功訊息,都認為是訊息已經成功消費。
模式2:手動確認
消費者從佇列中獲取訊息後,伺服器會將該訊息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那麼該訊息將一直處於不可用狀態。
手動模式:
自動模式:
1.1. 訂閱模式
1.1.1. 圖示
解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個佇列
3、生產者沒有將訊息直接傳送到佇列,而是傳送到了交換機
4、每個佇列都要繫結到交換機
5、生產者傳送的訊息,經過交換機,到達佇列,實現,一個訊息被多個消費者獲取的目的
1.1.2. 訊息的生產者(看作是後臺系統)
向交換機中傳送訊息。
注意:訊息傳送到沒有佇列繫結的交換機時,訊息將丟失,因為,交換機沒有儲存訊息的能力,訊息只能存在在佇列中。
1.1.3. 消費者1(看作是前臺系統)
1.1.4. 消費者2(看作是搜尋系統)
1.1.5. 測試
測試結果:
同一個訊息被多個消費者獲取。
在管理工具中檢視佇列和交換機的繫結關係:
路由模式
這裡寫程式碼片
1.1.1. 圖示
1.1.2. 生產者
1.1.3. 消費者1(前臺系統)
1.1.4. 消費2(搜尋系統)
1.1. 萬用字元模式
1.1.1. 圖示
1.1.2. 生產者
1.1.3. 消費者1(前臺系統)
1.1.4. 消費者2(搜尋系統)