1. 程式人生 > 其它 >RabbitMQ--mandatory、immediate引數和備份交換器

RabbitMQ--mandatory、immediate引數和備份交換器

mandatory和immediate是channel.basicPublish方法中的兩個引數,它們都有當訊息傳遞過程中不可達目的地時將訊息返回給生產者的功能。

RabbitMQ提供的備份交換器(AlternateExchange)可以將未能被交換器路由的訊息(沒有繫結佇列或者沒有匹配的繫結)儲存起來,而不用返回給客戶端。

mandatory引數
(1)當mandatory引數設為true時,交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列,那麼RabbitMQ會呼叫Basic. Return命令將訊息返回給生產者。
(2)當mandatory引數設定為false 時,出現上述情形,則訊息直接被丟棄


那麼生產者如何獲取到沒有被正確路由到合適佇列的訊息呢?這時候可以通過呼叫channel. addReturnListener來新增ReturnListener監聽器實現。

channel.basicPublish(EXCHANGE_NAME, "", true,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                "mandatory test".getBytes());
        channel.addReturnListener(new ReturnListener() {
            @Override
            
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body)throws IOException { String message = new String(body); System.out.println("Basic.Return返回的結果是:" + message); } });

上面程式碼中生產者沒有成功地將訊息路由到佇列,此時RabbitMQ會通過Basic.Return返回“mandatory test” 這條訊息,之後生產者客戶端通過ReturnListener監聽到了這個事件,上面程式碼的最後輸出應該是

“Basic.Returm返回的結果是: mandatory test"

從AMQP協議層面來說,其對應的流轉過程如下圖所示:

immediate引數
當immediate引數設為true時,如果交換器在將訊息路由到佇列時發現佇列上並不存在任何消費者,那麼這條訊息將不會存入佇列中。當與路由鍵匹配的所有佇列都沒有消費者時,該訊息會通過Basic.Return返回至生產者。

(1)mandatory引數告訴伺服器至少將該訊息路由到一個佇列中,否則將訊息返回給生產者。
(2)immediate引數告訴伺服器,如果該訊息關聯的佇列上有消費者,則立刻投遞;如果所有匹配的佇列上都沒有消費者,則直接將訊息返還給生產者,不用將訊息存入佇列而等待消費者了。
RabbitMQ 3.0 版本開始去掉了對immediate引數的支援,對此RabbitMQ官方解釋是:immediate引數會影響映象佇列的效能,增加了程式碼複雜性,建議採用TTL和DLX的方法替代

備份交換器
備份交換器,英文名稱為Alternate Exchange,簡稱AE,或者更直白地稱之為“備胎交換器”。生產者在傳送訊息的時候如果不設定mandatory引數,那麼訊息在未被路由的情況下將會丟失;如果設定了mandatory引數,那麼需要新增ReturnListener的程式設計邏輯,生產者的程式碼將變得複雜。

如果既不想複雜化生產者的程式設計邏輯,又不想訊息丟失,那麼可以使用備份交換器,這樣可以將未被路由的訊息儲存在RabbitMQ中,再在需要的時候去處理這些訊息。

可以通過在宣告交換器(呼叫channel. exchangeDeclare方法)的時候新增alternate-exchange引數來實現,也可以通過策略的方式實現。如果兩者同時使用,則前者的優先順序更高,會覆蓋掉Policy的設定。

//備份交換器
        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("alternate-exchange", "myAe");
        channel.exchangeDeclare("normalExchange", "direct", true, false, argsMap);
        channel.exchangeDeclare("myAe", "fanout", true, false, null);
        channel.queueDeclare("normalQueue", true, false, false, null);
        channel.queueBind("normalQueue", "normalExchange", "normalKey");
        channel.queueDeclare("unroutedQueue", true, false, false, null);
        channel.queueBind("unroutedQueue", "myAe", "");

上面的程式碼中聲明瞭兩個交換器normalExchange和myAe,分別綁定了normalQueue 和unroutedQueue這兩個佇列,同時將myAe設定為normalExchange的備份交換器。注意myAe的交換器型別為fanout

如果此時傳送一條訊息到normalExchange.上,當路由鍵等於“normalKey"的時候,訊息能正確路由到normalQueue這個佇列中。如果路由鍵設為其他值,比如“errorKey”, 即訊息不能被正確地路由到與normalExchange繫結的任何佇列上,此時就會發送給myAe,進而傳送到unroutedQueue這個佇列。

備份交換器其實和普通的交換器沒有太大的區別,為了方便使用,建議設定為fanout型別,如若讀者想設定為direct或者topic 的型別也沒有什麼不妥。需要注意的是,訊息被重新發送到備份交換器時的路由鍵和從生產者發出的路由鍵是一樣的

考慮這樣一種情況,如果備份交換器的型別是direct,並且有一個與其繫結的佇列,假設繫結的路由鍵是key1,當某條攜帶路由鍵為key2的訊息被轉發到這個備份交換器的時候,備份交換器沒有匹配到合適的佇列,則訊息丟失。如果訊息攜帶的路由鍵為keyl,則可以儲存到佇列中。

對於備份交換器,總結了以下幾種特殊情況:

如果設定的備份交換器不存在,客戶端和RabbitMQ服務端都不會有異常出現,此時訊息會丟失。
如果備份交換器沒有繫結任何佇列,客戶端和RabbitMQ服務端都不會有異常出現,此時訊息會丟失。
如果備份交換器沒有任何匹配的佇列,客戶端和RabbitMQ服務端都不會有異常出現,此時訊息會丟失。
如果備份交換器和 mandatory引數一起使用,那麼mandatory引數無效。