1. 程式人生 > 程式設計 >RabbitMQ 持久化

RabbitMQ 持久化

預設情況下,exchange、queue、message 等資料都是儲存在記憶體中的,這意味著如果 RabbitMQ 重啟、關閉、宕機時所有的資訊都將丟失。

RabbitMQ 提供了持久化來解決這個問題,持久化後,如果 RabbitMQ 傳送 重啟、關閉、宕機,下次起到時 RabbitMQ 會從硬碟中恢復exchange、queue、message 等資料。

持久化

RabbitMQ 持久化包含3個部分

  • exchange 持久化,在宣告時指定 durable 為 true
  • queue 持久化,在宣告時指定 durable 為 true
  • message 持久化,在投遞時指定 delivery_mode=2(1是非持久化)

queue 的持久化能保證本身的元資料不會因異常而丟失,但是不能保證內部的 message 不會丟失。要確保 message 不丟失,還需要將 message 也持久化

如果 exchange 和 queue 都是持久化的,那麼它們之間的 binding 也是持久化的。

如果 exchange 和 queue 兩者之間有一個持久化,一個非持久化,就不允許建立繫結。

注意:一旦確定了 exchange 和 queue 的 durable,就不能修改了。如果非要修改,唯一的辦法就是刪除原來的 exchange 或 queue 後,重現建立

拓展

如果將所有的訊息都進行持久化操作,這樣會嚴重影響 RabbitMQ 的效能。寫入磁碟的速度可比寫入記憶體的速度要慢很多。所以需要在可靠性和吞吐量之間做權衡。

將 exchange、queue 和 message 都進行持久化操作後,也不能保證訊息一定不會丟失,訊息存入RabbitMQ 之後,還需要一段時間才能存入硬碟。RabbitMQ 並不會為每條訊息都進行同步存檔,如果在這段時間,伺服器宕機或者重啟,訊息還沒來得及儲存到磁碟當中,就會丟失。對於這種情況,可以引入 RabiitMQ 映象佇列機制。

程式碼實現

通過程式碼實現 RabbitMQ 持久化

原生的實現方式

原生的 RabbitMQ 客戶端需要完成三個步驟。

第一步,設定交換器的持久化

// 三個引數分別為 交換器名、交換器型別、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME,"topic"
,true); 複製程式碼

第二步,設定佇列的持久化

// 引數1 queue :佇列名  
// 引數2 durable :是否持久化  
// 引數3 exclusive :僅建立者可以使用的私有佇列,斷開後自動刪除  
// 引數4 autoDelete : 當所有消費客戶端連線斷開後,是否自動刪除佇列  
// 引數5 arguments  
channel.queueDeclare(QUEUE_NAME,true,false,null);  
複製程式碼

第三步,設定訊息的持久化

// 引數1 exchange :交換器  
// 引數2 routingKey : 路由鍵  
// 引數3 props : 訊息的其他引數,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化  
// 引數4 body : 訊息體  
channel.basicPublish("",queue_name,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());  
複製程式碼

Spring RabbitMQ 的實現方式

Spring RabbitMQ 是對原生的 RabbitMQ 客戶端的封裝。一般情況下,我們只需要定義 exchange 和 queue 的持久化。

配置交換機持久化

/**
 * value      交換機名稱
 * type       交換機型別,預設 direct
 * durable    是否持久化,預設 true
 * autoDelete 是否自動刪除,預設 false
 * internal   是否為內部交換機,預設為 false
 */
@Exchange(value = "exchangeName",type = "direct",durable = "true",autoDelete = "false",internal = "false")
複製程式碼

配置佇列持久化

/**
 * value      佇列名稱
 * durable    是否持久化
 * exclusive  否為獨佔佇列
 * autoDelete 是否自動刪除
 */
@Queue(value = "queryName",exclusive = "false",autoDelete = "false")
複製程式碼

一個用例

/**
 * 消費訊息
 */
@Component
public class ConsumerMessageListener {
    /**
     * 監聽指定佇列
     *
     * @param message 訊息體
     * @param headers 訊息頭
     * @param channel 通道
     * @return
     * @RabbitListener 指定了 exchange 、key、Queue 後,如果 Rabbitmq 沒有會去建立
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "exchangeName",internal = "false"),key = "routingKeyValue",value = @Queue(value = "queryName",autoDelete = "false")
    ))
    public void listenerMessage(String message,@Headers Map<String,Object> headers,Channel channel)
            throws IOException {
        System.out.println(message);
        System.out.println(headers);
        //手動 ack
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }
}
複製程式碼

正常思路想 exchange 和 queue 的持久化應該在訊息傳送端配置,其實也可以配置在訊息消費端,RabbitListener 回去檢查 exchange 和 queue,如果不存在則建立

相關文章

RabbitMQ 之訊息的可靠性投遞

RabbitMQ 之訊息的可靠性消費