RabbitMQ 持久化
預設情況下,exchange、queue、message 等資料都是儲存在記憶體中的,這意味著如果 RabbitMQ 重啟、關閉、宕機時所有的資訊都將丟失。
RabbitMQ 提供了持久化來解決這個問題,持久化後,如果 RabbitMQ 傳送 重啟、關閉、宕機,下次起到時 RabbitMQ 會從硬碟中恢復exchange、queue、message 等資料。
持久化
RabbitMQ 持久化包含3個部分
- exchange 持久化,在宣告時指定 durable 為 true
- queue 持久化,在宣告時指定 durable 為 true
- message 持久化,在投遞時指定 delivery_mode=2(1是非持久化)
queue 的持久化能保證本身的元資料不會因異常而丟失,但是不能保證內部的 message 不會丟失。要確保 message 不丟失,還需要將 message 也持久化
如果 exchange 和 queue 都是持久化的,那麼它們之間的 binding 也是持久化的。
如果 exchange 和 queue 兩者之間有一個持久化,一個非持久化,就不允許建立繫結。
注意:一旦確定了 exchange 和 queue 的 durable,就不能修改了。如果非要修改,唯一的辦法就是刪除原來的 exchange 或 queue 後,重現建立
拓展
如果將所有的訊息都進行持久化操作,這樣會嚴重影響 RabbitMQ 的效能。寫入磁碟的速度可比寫入記憶體的速度要慢很多。所以需要在可靠性和吞吐量之間做權衡。
將 exchange、queue 和 message 都進行持久化操作後,也不能保證訊息一定不會丟失,訊息存入RabbitMQ 之後,還需要一段時間才能存入硬碟。RabbitMQ 並不會為每條訊息都進行同步存檔,如果在這段時間,伺服器宕機或者重啟,訊息還沒來得及儲存到磁碟當中,就會丟失。對於這種情況,可以引入 RabiitMQ 映象佇列機制。
程式碼實現
通過程式碼實現 RabbitMQ 持久化
原生的實現方式
原生的 RabbitMQ 客戶端需要完成三個步驟。
第一步,設定交換器的持久化
// 三個引數分別為 交換器名、交換器型別、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME,"topic" ,true);
複製程式碼
第二步,設定佇列的持久化
// 引數1 queue :佇列名
// 引數2 durable :是否持久化
// 引數3 exclusive :僅建立者可以使用的私有佇列,斷開後自動刪除
// 引數4 autoDelete : 當所有消費客戶端連線斷開後,是否自動刪除佇列
// 引數5 arguments
channel.queueDeclare(QUEUE_NAME,true,false,null);
複製程式碼
第三步,設定訊息的持久化
// 引數1 exchange :交換器
// 引數2 routingKey : 路由鍵
// 引數3 props : 訊息的其他引數,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 引數4 body : 訊息體
channel.basicPublish("",queue_name,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
複製程式碼
Spring RabbitMQ 的實現方式
Spring RabbitMQ 是對原生的 RabbitMQ 客戶端的封裝。一般情況下,我們只需要定義 exchange 和 queue 的持久化。
配置交換機持久化
/**
* value 交換機名稱
* type 交換機型別,預設 direct
* durable 是否持久化,預設 true
* autoDelete 是否自動刪除,預設 false
* internal 是否為內部交換機,預設為 false
*/
@Exchange(value = "exchangeName",type = "direct",durable = "true",autoDelete = "false",internal = "false")
複製程式碼
配置佇列持久化
/**
* value 佇列名稱
* durable 是否持久化
* exclusive 否為獨佔佇列
* autoDelete 是否自動刪除
*/
@Queue(value = "queryName",exclusive = "false",autoDelete = "false")
複製程式碼
一個用例
/**
* 消費訊息
*/
@Component
public class ConsumerMessageListener {
/**
* 監聽指定佇列
*
* @param message 訊息體
* @param headers 訊息頭
* @param channel 通道
* @return
* @RabbitListener 指定了 exchange 、key、Queue 後,如果 Rabbitmq 沒有會去建立
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "exchangeName",internal = "false"),key = "routingKeyValue",value = @Queue(value = "queryName",autoDelete = "false")
))
public void listenerMessage(String message,@Headers Map<String,Object> headers,Channel channel)
throws IOException {
System.out.println(message);
System.out.println(headers);
//手動 ack
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
複製程式碼
正常思路想 exchange 和 queue 的持久化應該在訊息傳送端配置,其實也可以配置在訊息消費端,RabbitListener 回去檢查 exchange 和 queue,如果不存在則建立