1. 程式人生 > >RabbitMQ入門:路由(Routing)

RabbitMQ入門:路由(Routing)

在上一篇部落格《RabbitMQ入門:釋出/訂閱(Publish/Subscribe)》中,我們認識了fanout型別的exchange,它是一種通過廣播方式傳送訊息的路由器,所有和exchange建立的繫結關係的佇列都會接收到訊息。但是有一些場景只需要訂閱到一部分訊息,這個時候就不能使用fanout 型別的exchange了,這個就引出來今天的“豬腳”--Direct Exchange,通過Routing Key來決定需要將訊息傳送到哪個或者哪些佇列中。

接下來請收看詳細內容:

  1. Direct Exchange(直接路由器)
  2. 多重繫結
  3. 程式碼例項

一、Direct Exchange(直接路由器)

在上文中介紹exchange的時候,對direct exchange進行了簡單介紹,它是一種完全按照routing key(路由關鍵字)進行投遞的:當訊息中的routing key和佇列中的binding key完全匹配時,才進行會將訊息投遞到該佇列中。這裡提到了一個routing key和binding key(繫結關鍵字),是什麼東東?

  1. routing key:

     在傳送訊息的時候,basicPublish的第二個引數就是routing key,由於上次是fanout 型別的exchange 進行廣播方式投遞,這個欄位不會影響投遞結果,因此我們這裡就傳入了“”,但是在direct 型別的exchange中我們就不能傳入""了,需要指定具體的關鍵字。

  2. binding key:

    我們在前文中建立繫結關係的時候,queueBind的第三個引數就是繫結關鍵字

我們宣告direact exchange的時候使用:


二、多重繫結

多個佇列相同的繫結鍵繫結到同一個路由器的情況,我們稱之為多重繫結

工作模型為(P代表生產者,X代表路由器,紅色的Q代表隊列,C代表消費者):

三、程式碼例項

 預備知識瞭解完了,現在來寫個程式感受下。

  1. 生產者
    public class LogDirectSender {
        // exchange名字
        public static String EXCHANGE_NAME = "directExchange";
    
        
    public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立連線和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.為通道宣告direct型別的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 3.傳送訊息到指定的exchange,佇列指定為空,由exchange根據情況判斷需要傳送到哪些佇列 String routingKey = "debug"; String msg = " hello rabbitmq, I am " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.關閉連線 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }

    和上次部落格中生產者的區別就是黑字粗體部分:1.路由器型別改為direct 2.訊息釋出的時候指定了routing key

  2. 消費者
    public class LogDirectReciver {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立連線和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道宣告direct型別的exchange
                channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                // 3.建立隨機名字的佇列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.建立exchange和佇列的繫結關係
                 String[] bindingKeys = { "error", "info", "debug" };
    //            String[] bindingKeys = { "error" };
                for (int i = 0; i < bindingKeys.length; i++) {
                    channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]);
                    System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]);
                }
    
                // 5.通過回撥生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取訊息內容然後處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費訊息
                channel.basicConsume(queueName, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

    和上次部落格中消費者的區別就是黑字粗體部分:1.路由器型別改為direct 2.建立繫結關係的時候指定了binding key

  3. 執行消費者,控制檯log列印如下:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug

    這個消費者我們視為消費者1,它會接收error,info,debug三個關鍵字的訊息。

  4. 將String[] bindingKeys = { "error", "info", "debug" };改為String[] bindingKeys = { "error" };,然後再執行一次消費者。控制檯log列印如下:
     **** LogDirectReciver keep alive ,waiting for error

    這個消費者我們視為消費者2,它只會接收error 關鍵字的訊息。

  5. 執行生產者,然後將String routingKey = "debug";的值分別改為“info"和"error",然後分別執行,這樣一共執行了三次生產者
    第一次執行:
    product send a msg:  hello rabbitmq, I am debug
    
    第二次執行:
    product send a msg:  hello rabbitmq, I am info
    
    第三次執行:
    product send a msg:  hello rabbitmq, I am error
  6. 再次檢視兩個消費者的控制檯log:
    消費者1:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug
    *********** LogDirectReciver get message :[ hello rabbitmq, I am debug]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am info]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
    
    消費者2:
     **** LogDirectReciver keep alive ,waiting for error
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
  7. 檢視RabbitMQ管理頁面

    exchanges標籤頁裡面多了個direct型別的路由器。進入詳細頁面:

    有4個繫結關係,其中三個的佇列是同一個。切換到Queues標籤頁:

    有兩個臨時佇列。

  8. 如果關掉消費者1和消費者2,會發現佇列自動刪除了,繫結關係也不存在了。

相關推薦

RabbitMQ入門路由(Routing)

在上一篇部落格《RabbitMQ入門:釋出/訂閱(Publish/Subscribe)》中,我們認識了fanout型別的exchange,它是一種通過廣播方式傳送訊息的路由器,所有和exchange建立的繫結關係的佇列都會接收到訊息。但是有一些場景只需要訂閱到一部分訊息,這個時候就不能使用fanout 型別的

RabbitMQ入門主題路由器(Topic Exchange)

AI orange topic 都是 erro col host nfa 匹配 上一篇博文中,我們使用direct exchange 代替了fanout exchange,這次我們來看下topic exchange。 一、Topic Exchange介紹 topic e

RabbitMQ入門工作佇列(Work Queue)

假設有這一些比較耗時的任務,按照上一次的那種方式,我們要一直等前面的耗時任務完成了之後才能接著處理後面耗時的任務,那要等多久才能處理完?別擔心,我們今天的主角--工作佇列就可以解決該問題。我們將圍繞下面這個索引展開: 什麼是工作佇列 程式碼準備 迴圈分發 訊息確認 公平分發 訊息持久化

RabbitMQ入門釋出/訂閱(Publish/Subscribe)

在前面的兩篇部落格中 遇到的例項都是一個訊息只發送給一個消費者(工作者),他們的訊息模型分別為(P代表生產者,C代表消費者,紅色代表隊列): 這次我們來看下將一個訊息傳送給多個消費者(工作者),這種模式一般被稱為“釋出/訂閱”模式。其工作模型為(P代表生產者,X代表Exchange(路由器/交

RabbitMQ入門Hello RabbitMQ 程式碼例項

在之前的一篇部落格RabbitMQ入門:認識並安裝RabbitMQ(以Windows系統為例)中,我們安裝了RabbitMQ並且對其也有的初步的認識,今天就來寫個入門小例子來加深概念理解並瞭解程式碼怎麼實現。 本篇部落格圍繞下面幾個方面展開: 程式碼前的理論熱身 程式碼例項:Hello Rabbit

RabbitMQ入門認識並安裝RabbitMQ(以Windows系統為例)

專案需求剛剛遞交,新需求還沒來。閒下來了,寫寫部落格放鬆下。 ===========華麗的分割線================= 最近在學習Spring Cloud,其中訊息匯流排Spring Cloud Bus是必不可少的,但是Spring Cloud Bus目前只支援RabbitMQ和kafka,因

RabbitMQ入門總結

隨著上一篇博文的釋出,RabbitMQ的基礎內容我也學習完了,RabbitMQ入門系列的部落格跟著收官了,以後有機會的話再寫一些在實戰中的應用分享,多謝大家一直以來的支援和認可。 RabbitMQ入門系列一共有8篇隨筆: (adsbygoogle = window.ad

RabbitMQ入門在Spring Boot 應用中整合RabbitMQ

在上一篇隨筆中我們認識並安裝了RabbitMQ,接下來我們來看下怎麼在Spring Boot 應用中整合RabbitMQ。 先給出最終目錄結構: 搭建步驟如下: 新建maven工程amqp 修改pom檔案,引入spring-boot-starter-amqp和spring-boot-sta

RabbitMQ入門遠端過程呼叫(RPC)

假如我們想要呼叫遠端的一個方法或函式並等待執行結果,也就是我們通常說的遠端過程呼叫(Remote Procedure Call)。怎麼辦? 今天我們就用RabbitMQ來實現一個簡單的RPC系統:客戶端傳送一個請求訊息,服務端以一個響應訊息迴應。為了能夠接收到響應,客戶端在傳送訊息的同時傳送一個回撥佇列用來

RabbitMQ入門教程(六)路由選擇Routing

簡介 本節主要演示使用直連線型別,將多個路由鍵繫結到同一個佇列上。也可以將同一個鍵繫結到多個佇列上(多重繫結multiple bindings),此時滿足鍵的佇列都能收到訊息,不滿足的直接被丟棄。

RabbitMQ官方中文入門教程(PHP版) 第四部分:路由(Routing)

路由(Routing) 在前面的教程中,我們實現了一個簡單的日誌系統。可以把日誌訊息廣播給多個接收者。 本篇教程中我們打算新增一個功能——使得它能夠只訂閱訊息的一個字集。例如,我們只需要把嚴重的錯誤日誌資訊寫入日誌檔案(儲存到磁碟),但同時仍然把所有的日誌資訊輸出到控制檯中

RabbitMQ指南之四路由Routing)和直連交換機(Direct Exchange)

on() call basic play logs ued void emit 依賴 原文:RabbitMQ指南之四:路由(Routing)和直連交換機(Direct Exchange)  在上一章中,我們構建了一個簡單的日誌系統,我們可以把消息廣播給很多的消費者。在本章中

華為RIP路由信息協議RIP(Routing Information Protocol)

不可達 路由器 直連 徹底刪除 技術 http ripv2 設置 ip路由 RIP:路由信息協議RIP(Routing Information Protocol)的簡稱,它是一種基於距離矢量(Distance-Vector)算法的協議,使用跳數作為度量來衡量到達目的網絡的距

RabbitMQ學習第四記路由模式(direct)

exceptio 指定 手動 ack key static ech 保存日誌 sta 1、什麽是路由模式(direct)   路由模式是在使用交換機的同時,生產者指定路由發送數據,消費者綁定路由接受數據。與發布/訂閱模式不同的是,發布/訂閱模式只要是綁定了交換機的隊列都會收

rabbitmq 入門教程一(只有佇列,不涉及路由

為何要學習rabbitmq 因為在spring cloud bus中使用到的是rabbitmq和kafka,所以就需要搞一下rabbitmq和kafka。先上圖 生產者(Producer)

RabbitMQ入門教程(十五)普通叢集和映象叢集

普通叢集 推薦一篇優秀的文章: 映象叢集 映象叢集的特點:所有節點的訊息都會進行同步。RabbitMQ是沒有中心的。 Rabbit映象功能,需要基於rabbitmq策略來實現,政策是用來控制和修改群集範圍的某個vhost佇列行為和Exchan

RabbitMQ入門教程(一)安裝和常用命令

一:Mac安裝 Mac安裝比Windows安裝更加方便,也不需要再額外配置Web外掛,因為在安裝的時候預設已經配置好了 // 在Updating Homebrew...時可能會卡一會,只需要等就行了 // 在安裝的過程中可能因為網路問題,可能會有部分會失

RabbitMQ入門教程(七)主題交換機Topics

簡介 本節主要演示交換機的另一種型別:主題型別topic,直連線型別direct必須是生產者釋出訊息指定的routingKey和消費者在佇列繫結時指定的routingKey完全相等時才能匹配到佇列上,與direct不同,topic可以進行模糊匹配,可以使用星號

RabbitMQ入門教程(二)簡介和基本概念

一:簡介 RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗,訊息佇列是一種應用系統之間的通訊方法,是通過讀寫出入佇列的訊息來通訊(RPC則是通

RabbitMQ入門教程(十四)RabbitMQ單機叢集搭建

叢集簡介 理解叢集先理解一下元資料 佇列元資料:佇列的名稱和宣告佇列時設定的屬性(是否持久化、是否自動刪除、佇列所屬的節點) 交換機元資料:交換機的名稱、型別、屬性(是否持久化等) 繫結元資料:一張簡單的表格展示瞭如何將訊息路由到佇列。包含的列有 交換機名