1. 程式人生 > 其它 >RabbitMQ知識點整理14-訊息何去何從

RabbitMQ知識點整理14-訊息何去何從

mandatory和immediate是channel.basicPublish方法中的兩個引數, 它們都有當訊息傳遞過程中不可達目的地時將訊息返回給生產者的功能, RabbitMQ 提供的備份交換器(Altemate Exchange) 可以將未能被交換器路由的訊息(沒有繫結佇列或者沒有匹配的繫結)儲存起來,而不用返回給客戶端。

關於mandatory和immediate接下來會詳細說明...

mandatory 引數

當mandatory 引數設為true 時,交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列,那麼RabbitMQ 會呼叫Basic.Return 命令將訊息返回給生產者。當mandatory 引數設定為false 時,出現上述情形,則訊息直接被丟棄。

那麼生產者如何獲取到沒有被正確路由到合適佇列的訊息呢?這時候可以通過呼叫channel.addReturnListener來新增ReturnListener 監昕器實現。

使用mandatory 引數的關鍵程式碼如程式碼如下:

/**
 * channel.basicPublish的引數mandatory的使用
 *
 * @author jiangkd
 * @date 2020/11/24 9:31
 */
public class MandatoryTest {

    final private String EXCHANGE_NAME = "exchange_jkd_demo";
    
final private String QUEUE_NAME = "queue_jkd_demo"; @Test public void mandatoryTest() throws IOException, TimeoutException { // 關於連線rabbitmq, 自己封裝了工具類, 不明白的請看之前的關於連線rabbitmq的教程說明 final Channel channel = ConnectUtil.channel(); // 定義交換器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true
, false, false, null); // 定義佇列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // bind, 繫結鍵是 "mandatory_test" channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "mandatory_test"); // 傳送訊息, 路由鍵空著, 讓訊息路由不到佇列 // 設定引數 mandatory為true channel.basicPublish(EXCHANGE_NAME, "", true , MessageProperties.PERSISTENT_TEXT_PLAIN , "mandatory test".getBytes()); // 新增ReturnListener監聽 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText , String exchange, String routingKey , AMQP.BasicProperties properties, byte[] body) { // String message = new String(body); System.out.println("Basic.Return返回的結果是:" + message); } }); } }

上面程式碼中生產者沒有成功地將訊息路由到佇列,此時RabbitMQ 會通過Basic.Return返回" mandatory test " 這條訊息, 之後生產者客戶端通過ReturnListener 監昕到了這個事件, 最終的控制檯輸出如下:

Basic.Return返回的結果是:mandatory test

mandator引數圖如下:

 上面的示例只是演示了交換器路由不到佇列, 如果存在 交換器1(direct) 綁定了 交換器2(fanout), 交換器2綁定了佇列, 此時如果交換器1路由不到交換器2, 也會被監聽到並返回給生產者, 示例如下:

/**
     * 測試引數mandatory, 當交換器路由不到交換器的時候
     *
     * @throws IOException
     * @throws TimeoutException
     */
    @Test
    public void mandatoryTest2() throws IOException, TimeoutException {
        //
        String exchange_name1 = "exchange_jkd_name1";
        String exchange_name2 = "exchange_jkd_name2";
        String queue_name = "queue_jkd_name1";
        //
        final Channel channel = ConnectUtil.channel();

        // 定義交換器1
        channel.exchangeDeclare(exchange_name1, BuiltinExchangeType.DIRECT
                , true, false, false, null);
        // 定義交換器2
        channel.exchangeDeclare(exchange_name2, BuiltinExchangeType.FANOUT
                , true, false, false, null);

        // 繫結兩個交換器, 繫結鍵是 "exchange_bind_key"
        channel.exchangeBind(exchange_name2, exchange_name1, "exchange_bind_key");

        // 定義佇列
        channel.queueDeclare(queue_name, true, false, false, null);

        // 繫結佇列和交換器2, 交換器2傳播型別是fanout, 無需指定繫結鍵
        channel.queueBind(queue_name, exchange_name2, "");

        // 傳送訊息, 路由鍵空著, 讓訊息路由不到交換器2
        // 設定引數 mandatory為true
        channel.basicPublish(exchange_name1, "", true
                , MessageProperties.PERSISTENT_TEXT_PLAIN, "Helo~~~".getBytes());

        // 新增ReturnListener監聽
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText
                    , String exchange, String routingKey
                    , AMQP.BasicProperties properties, byte[] body) {
                //
                String message = new String(body);
                System.out.println("Basic.Return返回的結果是:" + message);
            }
        });
    }
View Code

控制檯輸出如下:

Basic.Return返回的結果是:Helo~~~

immediate 引數

當immediate引數設定為true時, 如果交換器在將訊息路由到佇列時發現佇列上並不存在任何消費者, 那麼這條訊息將不會存入佇列中, 當與路由鍵匹配的所有佇列都沒有消費者時,該訊息會通過Basic.Return 返回至生產者。

概括來說, mandatory 引數告訴伺服器至少將該訊息路由到一個佇列(或者交換器)中, 否則將訊息返回給生產者.  imrnediate引數告訴伺服器, 如果該訊息關聯的佇列上有消費者,則立刻投遞; 如果所有匹配的佇列上都沒有消費者,則直接將訊息返還給生產者, 不用將訊息存入佇列而等待消費了.

備份交換器

備份交換器,英文名稱為Altemate Exchange ,簡稱廟,或者更直白地稱之為"備胎交換器"。生產者在傳送訊息的時候如果不設定mandatory引數, 那麼訊息在未被路由的情況下將會丟失; 如果設定了mandatory 引數, 那麼需要新增ReturnListener 的程式設計邏輯, 生產者的程式碼將變得複雜, 如果既不想複雜化生產者的程式設計邏輯,又不想訊息丟失, 那麼可以使用備份交換器,這樣可以將未被路由的訊息儲存在RabbitMQ 中,再在需要的時候去處理這些訊息。

可以通過在宣告交換器(呼叫channel.exchangeDeclare 方法)的時候新增alternate-exchange 引數來實現,也可以通過策略的方式實現。如果兩者同時使用,則前者的優先順序更高,會覆蓋掉Policy 的設定。

Map<String, Object> args = new HashMap<String , Object>();
args.put("a1ternate-exchange" , "myAe");
channe1.exchangeDeclare( "normalExchange" , "direct" , true , fa1se , args);
channe1.exchangeDec1are( "myAe" , "fanout" , true , fa1se , nu11) ;
channe1.queueDeclare( "normalQueue" , true , fa1se , fa1se , nu11);
channe1.queueBind("normalQueue" , "norma1Exchange" , " norma1Key");
channe1.queueDeclare("unroutedQueue" , true , fa1se , fa1se , nu11);
channel.queueBind("unroutedQueue", "myAe", "");

上面的程式碼中聲明瞭兩個交換器nonnallixchange 和myAe ,分別綁定了nonnalQueue 和umoutedQueue 這兩個佇列,同時將myAe 設定為nonnallixchange 的備份交換器。注意myAe的交換器型別為fanout 。

如果此時傳送一條訊息到nonnalExchange 上,當路由鍵等於" normalKey" 的時候,訊息能正確路由到nonnalQueue 這個佇列中。如果路由鍵設為其他值,如"errorKey", 即訊息不能被正確地路由到與nonnallixchange 繫結的任何佇列上,此時就會發送給myAe ,進而傳送到unroutedQueue 這個佇列。

備份交換器其實和普通的交換器沒有太大的區別,為了方便使用,建議設定為fanout 型別,如若想設定為direct 或者topic 的型別也沒有什麼不妥。需要注意的是,訊息被重新發送到備份交換器時的路由鍵和從生產者發出的路由鍵是一樣的。

考慮這樣一種情況,如果備份交換器的型別是direct , 並且有一個與其繫結的佇列,假設繫結的路由鍵是key1 , 當某條攜帶路由鍵為key2 的訊息被轉發到這個備份交換器的時候,備份交換器沒有匹配到合適的佇列,則訊息丟失。如果訊息攜帶的路由鍵為key1,則可以儲存到佇列中。

對於備份交換器,總結了以下幾種特殊情況:

1.如果設定的備份交換器不存在,客戶端和RabbitMQ 服務端都不會有異常出現,此時訊息會丟失

2.如果備份交換器沒有繫結任何佇列,客戶端和RabbitMQ 服務端都不會有異常出現,此時訊息會丟失

3.如果備份交換器沒有任何匹配的佇列,客戶端和RabbitMQ 服務端都不會有異常出現,此時訊息會丟失
4.如果備份交換器和mandatory 引數一起使用,那麼mandatory 引數無效。