1. 程式人生 > 其它 >七、RabbitMq交換機

七、RabbitMq交換機

目錄

上一講,我們建立了一個工作佇列。我們假設的是工作佇列背後,每個任務都恰好交付給一個消費者(工作程序)。在這一部分中,我們將做一些完全不同的事情-我們將訊息傳達給多個消費者。這種模式稱為 ”釋出/訂閱”.

為了說明這種模式,我們將構建一個簡單的日誌系統。
它將由兩個程式組成:第一個程式將發出日誌訊息,第二個程式是消費者。其中我們會啟動兩個消費者,其中一個消費者接收到訊息後把日誌儲存在磁碟,另外一個消費者接收到訊息後把訊息列印在螢幕上,事實上第一個程式發出的日誌訊息將廣播給所有消費者

Exchanges 概念

RabbitMQ 訊息傳遞模型的核心思想是: 生產者生產的訊息從不會直接傳送到佇列。實際上,通常生產者甚至都不知道這些訊息傳遞傳遞到了哪些佇列中。

相反,生產者只能將訊息傳送到交換機(exchange),交換機工作的內容非常簡單,一方面它接收來自生產者的訊息,另一方面將它們推入佇列。交換機必須確切知道如何處理收到的訊息。是應該把這些訊息放到特定佇列還是說把他們到許多佇列中還是說應該丟棄它們。這就的由交換機的型別來決定。

Exchanges 的型別

總共有以下型別:

  1. 直接(direct);
  2. 主題(topic) ;
  3. 標題(headers);
  4. 扇出(fanout);

再講交換機型別之前,將幾個概念

  • 無名exchange

前面部分我們對 exchange 一無所知,但仍然能夠將訊息傳送到佇列。之前能實現的原因是因為我們使用的是預設交換,我們通過空字串(“”)進行標識。

channel.basicPublish("", "hello", null, message.getBytes());

第一個引數是交換機的名稱。空字串表示預設或無名稱交換機:訊息能路由傳送到佇列中其實是由 routingKey(bindingkey)繫結 key 指定的,如果它存在的話

  • 臨時佇列

之前我們使用的是具有特定名稱的佇列(還記得 hello 和 ack_queue 嗎?)。佇列的名稱我們來說至關重要-我們需要指定我們的消費者去消費哪個佇列的訊息。

每當我們連線到 Rabbit 時,我們都需要一個全新的空佇列,為此我們可以建立一個具有隨機名稱的佇列,或者能讓伺服器為我們選擇一個隨機佇列名稱那就更好了。其次一旦我們斷開了消費者的連線,佇列將被自動刪除

建立臨時佇列的方式如下:

String queueName = channel.queueDeclare().getQueue();

創建出來之後長成這樣:

  • 繫結(bindings)

什麼是 bingding 呢,binding 其實是 exchange 和 queue 之間的橋樑,它告訴我們 exchange 和那個佇列進行了繫結關係。比如說下面這張圖告訴我們的就是 X 與 Q1 和 Q2 進行了繫結

Fanout 介紹

Fanout 這種型別非常簡單。正如從名稱中猜到的那樣,它是將接收到的所有訊息廣播到它知道的所有佇列中。系統中預設有些 exchange 型別

Fanout 實戰

Logs 和臨時佇列的繫結關係如下圖

  • ReceiveLogs01 將接收到的訊息列印在控制檯
public class ReceiveLogs01 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
         * 生成一個臨時的佇列 佇列的名稱是隨機的
         * 當消費者斷開和該佇列的連線時 佇列自動刪除
         */
        String queueName = channel.queueDeclare().getQueue();
        //把該臨時佇列繫結我們的 exchange 其中 routingkey(也稱之為 binding key)為空字串
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("等待接收訊息,把接收到的訊息列印在螢幕.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("控制檯列印接收到的訊息" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
  • ReceiveLogs02 將接收到的訊息儲存在磁碟
public class ReceiveLogs02 {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /**
         * 生成一個臨時的佇列 佇列的名稱是隨機的
         * 當消費者斷開和該佇列的連線時 佇列自動刪除
         */
        String queueName = channel.queueDeclare().getQueue();
        //把該臨時佇列繫結我們的 exchange 其中 routingkey(也稱之為 binding key)為空字串
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("等待接收訊息,把接收到的訊息寫到檔案.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            File file = new File("C:\\work\\rabbitmq_info.txt");
            FileUtils.writeStringToFile(file, message, "UTF-8");
            System.out.println("資料寫入檔案成功");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
  • EmitLog 傳送訊息給兩個消費者接收
public class EmitLog {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 宣告一個 exchange
             * 1.exchange 的名稱
             * 2.exchange 的型別
             */
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            Scanner sc = new Scanner(System.in);
            System.out.println("請輸入資訊");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println("生產者發出訊息" + message);
            }
        }
    }
}

我們構建了一個簡單的日誌記錄系統。我們能夠向許多接收者廣播日誌訊息。我們將向其中新增一些特別的功能-比方說我們只讓某個消費者訂閱釋出的部分訊息。例如我們只把嚴重錯誤訊息定向儲存到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。

Direct exchange介紹

我們再次來回顧一下什麼是 bindings,繫結是交換機和佇列之間的橋樑關係。也可以這麼理解:佇列只對它繫結的交換機的訊息感興趣。繫結用引數:routingKey 來表示也可稱該引數為 binding key,繫結之後的意義由其交換型別決定。

// 建立繫結我們用程式碼:
channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

前面我們的日誌系統將所有訊息廣播給所有消費者,對此我們想做一些改變,例如我們希望將日誌訊息寫入磁碟的程式僅接收嚴重錯誤(errros),而不儲存哪些警告(warning)或資訊(info)日誌訊息避免浪費磁碟空間。Fanout 這種交換型別並不能給我們帶來很大的靈活性-它只能進行無意識的廣播,在這裡我們將使用 direct 這種型別來進行替換,這種型別的工作方式是,訊息只去到它繫結的routingKey 佇列中去。

在上面這張圖中,我們可以看到 X 綁定了兩個佇列,繫結型別是 direct。佇列 Q1 繫結鍵為 orange,佇列 Q2 繫結鍵有兩個:一個繫結鍵為 black,另一個繫結鍵為 green.

在這種繫結情況下,生產者釋出訊息到 exchange 上,繫結鍵為 orange 的訊息會被髮布到佇列Q1。繫結鍵為 blackgreen 和的訊息會被髮布到佇列 Q2,其他訊息型別的訊息將被丟棄。

多重繫結

當然如果 exchange 的繫結型別是 direct,但是它繫結的多個佇列的 key 如果都相同,在這種情況下雖然繫結型別是 direct 但是它表現的就和 fanout 有點類似了,就跟廣播差不多,如上圖所示。

實戰

  • ReceiveLogsDirect01 錯誤訊息儲存到磁碟
public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "disk";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println("等待接收訊息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            message = "接收繫結鍵:" + delivery.getEnvelope().getRoutingKey() + ",訊息:" + message;
            File file = new File("C:\\work\\rabbitmq_info.txt");
            FileUtils.writeStringToFile(file, message, "UTF-8");
            System.out.println("錯誤日誌已經接收");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
  • ReceiveLogsDirect02 控制檯列印訊息
public class ReceiveLogsDirect02 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = "console";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        System.out.println("等待接收訊息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收繫結鍵 :" + delivery.getEnvelope().getRoutingKey() + ", 消 息:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
  • EmitLogDirect 發出日誌訊息
public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //建立多個 bindingKey
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info", "普通 info 資訊");
            bindingKeyMap.put("warning", "警告 warning 資訊");
            bindingKeyMap.put("error", "錯誤 error 資訊");
            //debug 沒有消費這接收這個訊息 所有就丟失了
            bindingKeyMap.put("debug", "除錯 debug 資訊");
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
                        message.getBytes("UTF-8"));
                System.out.println("生產者發出訊息:" + message);
            }
        }
    }
}

Topics

前面,我們改進了日誌記錄系統。我們沒有使用只能進行隨意廣播的 fanout 交換機,而是使用了 direct 交換機,從而有能實現有選擇性地接收日誌。

儘管使用 direct 交換機改進了我們的系統,但是它仍然存在侷限性-比方說我們想接收的日誌型別有info.base 和 info.advantage,某個佇列只想 info.base 的訊息,那這個時候 direct 就辦不到了。這個時候就只能使用 topic 型別

Topic 的要求

傳送到型別是 topic 交換機的訊息的 routing_key 不能隨意寫,必須滿足一定的要求,它必須是一個單詞列表,以點號分隔開。這些單詞可以是任意單詞,比如說:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".這種型別的。當然這個單詞列表最多不能超過 255 個位元組。

在這個規則列表中,其中有兩個替換符是大家需要注意的

  • *(星號)可以代替一個單詞

  • #(井號)可以替代零個或多個單詞

Topic 匹配案例

下圖繫結關係如下

Q1-->繫結的是

  • 中間帶 orange 帶 3 個單詞的字串(.orange.)

Q2-->繫結的是

  • 最後一個單詞是 rabbit 的 3 個單詞(..rabbit)

  • 第一個單詞是 lazy 的多個單詞(lazy.#)

上圖是一個佇列繫結關係圖,我們來看看他們之間資料接收情況是怎麼樣的

quick.orange.rabbit 被佇列 Q1Q2 接收到
lazy.orange.elephant 被佇列 Q1Q2 接收到
quick.orange.fox 被佇列 Q1 接收到
lazy.brown.fox 被佇列 Q2 接收到
lazy.pink.rabbit 雖然滿足兩個繫結但只被佇列 Q2 接收一次
quick.brown.fox 不匹配任何繫結不會被任何佇列接收到會被丟棄
quick.orange.male.rabbit 是四個單詞不匹配任何繫結會被丟棄

當佇列繫結關係是下列這種情況時需要引起注意

  • 當一個佇列繫結鍵是#,那麼這個佇列將接收所有資料,就有點像 fanout

  • 如果佇列繫結鍵當中沒有#和*出現,那麼該佇列繫結型別就是 direct

實戰

  • 生產者:EmitLogTopic
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            /**
             * Q1-->繫結的是
             * 中間帶 orange 帶 3 個單詞的字串(*.orange.*)
             * Q2-->繫結的是
             * 最後一個單詞是 rabbit 的 3 個單詞(*.*.rabbit)
             * 第一個單詞是 lazy 的多個單詞(lazy.#)
             *
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit", "被佇列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant", "被佇列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox", "被佇列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox", "被佇列 Q2 接收到");
            bindingKeyMap.put("lazy.pink.rabbit", "雖然滿足兩個繫結但只被佇列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox", "不匹配任何繫結不會被任何佇列接收到會被丟棄");
            bindingKeyMap.put("quick.orange.male.rabbit", "是四個單詞不匹配任何繫結會被丟棄");
            bindingKeyMap.put("lazy.orange.male.rabbit", "是四個單詞但匹配 Q2");
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("生產者發出訊息" + message);
            }
        }
    }
}
  • 消費者:ReceiveLogsTopic01
public class ReceiveLogsTopic01 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //宣告 Q1 佇列與繫結關係
        String queueName = "Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收訊息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收佇列 :" + queueName + " 綁 定 鍵:" + delivery.getEnvelope().getRoutingKey() + ",訊息:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}
  • 消費者:ReceiveLogsTopic02
public class ReceiveLogsTopic02 {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //宣告 Q2 佇列與繫結關係
        String queueName = "Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("等待接收訊息.....");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收佇列 :" + queueName + " 綁 定 鍵:" + delivery.getEnvelope().getRoutingKey() + ",訊息:" + message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

Header交換機不常用,就不說了