1. 程式人生 > >RabbitMQ入門:釋出/訂閱(Publish/Subscribe)

RabbitMQ入門:釋出/訂閱(Publish/Subscribe)

在前面的兩篇部落格中

遇到的例項都是一個訊息只發送給一個消費者(工作者),他們的訊息模型分別為(P代表生產者,C代表消費者,紅色代表隊列):

這次我們來看下將一個訊息傳送給多個消費者(工作者),這種模式一般被稱為“釋出/訂閱”模式。其工作模型為(P代表生產者,X代表Exchange(路由器/交換機),C代表消費者,紅色代表隊列):

我們發現,工作模型中首次出現路由器,並且每個消費者有單獨的佇列。生產者生成訊息後將其傳送給路由器,然後路由器轉送到佇列,消費者各自到自己的佇列裡面獲取訊息進行消費。在實際的應用場景中,生產者一般不會直接將訊息傳送給佇列,而是傳送給路由器進行中轉,Exchange必須清楚的知道怎麼處理收到的訊息:是將訊息傳送到一個特定佇列還是多有佇列,或者直接廢棄訊息。這種才符合RabbitMQ訊息模型的核心思想

接下來我們詳細展開今天的話題:

一、Exchange

Exchange在我們的工作模型中首次出現,因此需要詳細介紹下。

Exchange分為4種類型:

Direct:完全根據key進行投遞的,例如,繫結時設定了routing key為”abc”,那麼客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。
Topic:對key進行模式匹配後進行投遞,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
Fanout:不需要key,它採取廣播模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。
Headers:我們可以不考慮它。

今天我們的例項採用fanout型別的exchange。

儘管首次出現,但是其實我們前面的案例中也有用到exchange,只是我們沒有給他名字,用的是RabbitMQ預設的,比如下面這段程式碼,我們將路由器名這個引數傳入了“”,如果我們需要自己宣告exchange的話,這個就不能傳入“”了,而是傳入自己定義好的值。

二、臨時佇列

前面兩篇部落格中,我們都在使用佇列的時候給出了定義好的名字,這在生產者和消費者共用相同佇列的時候很有必要,但是我們有了exchange,生產者不需要知道有哪些佇列,因此佇列名字可以不用指定了,而是通過RabbitMQ 介面自己去生成臨時佇列,佇列名字也由RabbitMQ自動生成。通過

可以宣告一個非持久的、通道獨佔的、自動刪除的佇列,getQueue()方法可以獲取隨機佇列名字。這個名字用來在佇列和exchange之間建立binding關係的時候使用:

三、程式碼實現

基於上面exchange和臨時佇列的知識鋪墊,可以展開今天的程式碼實現了。

  1.  生產者
    public class Product {
        //exchange名字
        public static String EXCHANGE_NAME = "exchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立連線和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道宣告exchange和exchange的型別
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                
                String msg = " hello rabbitmq, this is publish/subscribe mode";
                // 3.傳送訊息到指定的exchange,佇列指定為空,由exchange根據情況判斷需要傳送到哪些佇列
                channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.關閉連線
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
  2. 消費者1
    public class Consumer1 {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.建立連線和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.為通道宣告exchange以及exchange型別
                channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                // 3.建立隨機名字的佇列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.建立exchange和佇列的繫結關係
                channel.queueBind(queueName, Product.EXCHANGE_NAME, "");
                System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");
                // 5.通過回撥生成消費者並進行監聽
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 獲取訊息內容然後處理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** Consumer1" + " get message :[" + msg + "]");
                    }
                };
                // 6.消費訊息
                channel.basicConsume(queueName, true, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
  3. 消費者2,核心程式碼同消費者1一樣,只是在日誌列印上將"Consumer1"改為"Consumer2"而已。這裡不再列出具體程式碼。
  4. 先執行消費者1和2,然後執行生產者,觀察控制檯log列印情況:
    生產者:
    product send a msg:  hello rabbitmq, this is publish/subscribe mode
    
    消費者1**** Consumer1 keep alive ,waiting for messages, and then deal them
    *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode]
    
    消費者2:
     **** Consumer2 keep alive ,waiting for messages, and then deal them
    *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]

    可以看到,當生產者發出訊息後,兩個消費者最終都收到了訊息。

  5. 我們去檢視RabbitMQ管理頁面:

    在Exchanges 標籤頁裡面多了一個名為“exchange”的路由器,他的型別是fanout。點exchange 的link進入詳細頁面:

    發現在binding專案中有了兩條繫結關係,佇列的名字也可以看到。將頁面切換到Queues標籤頁:

    出現了兩個新的佇列,佇列名字和繫結關係中的一樣,並且佇列都是自動刪除的、通道獨佔的。

  6. 然後將消費者1和消費者2都停掉,重新檢視管理頁面,我們發現exchange還在,binding關係不存在了,臨時佇列也自動刪除了

相關推薦

RabbitMQ入門釋出/訂閱Publish/Subscribe

在前面的兩篇部落格中 遇到的例項都是一個訊息只發送給一個消費者(工作者),他們的訊息模型分別為(P代表生產者,C代表消費者,紅色代表隊列): 這次我們來看下將一個訊息傳送給多個消費者(工作者),這種模式一般被稱為“釋出/訂閱”模式。其工作模型為(P代表生產者,X代表Exchange(路由器/交

RabbitMQ系列教程之三釋出\/訂閱Publish\/Subscribe

在前一個教程中,我們建立了一個工作佇列。工作佇列背後的假設是每個任務會被交付給一個【工人】。在這一部分我們將做一些完全不同的事情--我們將向多個【消費者】傳遞資訊。這種模式被稱為“釋出/訂閱”。   為了說明這種模式,我們將構建一個簡單的日誌系統。它將包括兩個程式,第一個將發

RabbitMQ官方中文入門教程(PHP版) 第三部分:釋出訂閱Publish/Subscribe

1 $exchange->setName('logs'); 2 $exchange->setType(AMQP_EX_TYPE_FANOUT); 3 $exchange->declare(); fanout交換器很簡單,你可能從名字上就能猜測出來,它把訊息傳送給它所知道

RabbitMQ使用教程釋出/訂閱模式—Publish/Subscribe

一、釋出/訂閱模式說明 今天我們來學習一點新的東西,之前我們是將一個訊息傳送給了一個特定的消費者,今天的做法完全不同,不再發送給某一個消費者,而是將一個訊息傳送給多個消費者,這便是:釋出/訂閱模式。 我們將使用該模式來實現一個日誌系統:一個程式產生日誌,一個

RabbitMQ系列教程之三發布/訂閱Publish/Subscribe

mqc 標題 整合 參數 cti 事情 return 控制臺 run (本教程是使用Net客戶端,也就是針對微軟技術平臺的) 在前一個教程中,我們創建了一個工作隊列。工作隊列背後的假設是每個任務會被交付給一個【工人】。在這一部分我們將做一些完全不同的事情--我們將向多個

rabbitmq官方教程之釋出訂閱Publish/Subscribe

(using the Java Client) 在上一篇工作佇列中中我們已經建立了一個工作佇列。工作佇列背後的假設是每個任務都交付給一個worker。 在這部分中,我們會做一些完全不同的事情 - 我們會向多個消費者傳遞資訊。這種模式被稱為“釋出/訂閱”。

RabbitMQ指南之三釋出/訂閱模式Publish/Subscribe

在上一章中,我們建立了一個工作佇列,工作佇列模式的設想是每一條訊息只會被轉發給一個消費者。本章將會講解完全不一樣的場景: 我們會把

RabbitMQ學習第三記發布/訂閱模式Publish/Subscribe

font image 直接 email err spl 回調方法 byte []   工作隊列模式是直接在生產者與消費者裏聲明好一個隊列,這種情況下消息只會對應同類型的消費者。   舉個用戶註冊的列子:用戶在註冊完後一般都會發送消息通知用戶註冊成功(失敗)。如果在一個系統中

RabbitMQ指南之三發布/訂閱模式Publish/Subscribe

問題 除了 消息 模型 server fan 以及 color let   在上一章中,我們創建了一個工作隊列,工作隊列模式的設想是每一條消息只會被轉發給一個消費者。本章將會講解完全不一樣的場景: 我們會把一個消息轉發給多個消費者,這種模式稱之為發布-訂閱模式。   為了

RabbitMQ釋出訂閱模式Publish/Subscribe

### 一、釋出/訂閱(Publish/Subscribe)模式 釋出訂閱是我們經常會用到的一種模式,生產者生產訊息後,所有訂閱者都可以收到。RabbitMQ的釋出/訂閱模型圖如下: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200

觀察者模式Observer Pattern,物件行為型模式,釋出-訂閱模式 Publish/Subscribe Pattern

意圖 通知狀態變化 定義物件間的一對多的依賴關係,當一個物件的狀態發生變化時,所有依賴它的物件都得到通知並被自動更新,由抽象類或介面實現。 推模式:目標角色複雜,並且觀察者角色進行更行時必須得到一些具體的變化資訊 拉模式:目標角色簡單 適用性 在

ZeroMQ的訂閱釋出publish-subscribe模式

轉 https://blog.csdn.net/cjf_wei/article/details/80036372 ZeroMQ的訂閱釋出模式是一種單向的資料釋出,當客戶端向服務端訂閱訊息之後,服務端便會將產生的訊息源源不斷的推送給訂閱者,本文的示例程式碼來源

RabbitMQ訊息佇列分發到多ConsumerPublish/Subscribe

 <=== RabbitMQ訊息佇列(三):任務分發機制       上篇文章中,我們把每個Message都是deliver到某個Consumer。在這篇文章中,我們將會將同一個Message deliver到多個Consumer中。這個模式也被成為

設計模式之觀察者模式(釋出/訂閱模式:publish/subscribe)

定義了一種一對多的依賴關係,讓多個觀察者物件同時監聽某一個主題物件.這個主題物件在狀態發生變化時,會通知所有觀察者物件,使它們能夠自動更新自己 觀察者模式的關鍵物件是主題Subject和觀察者Observer.一個Subject可以有任意數目依賴它的Observer, 一

Yahoo開源Pulsar大規模的釋出/訂閱Pub-Sub訊息傳遞平臺

Pulsar是一個分散式的訊息釋出/訂閱傳遞平臺,具有非常靈活的訊息模型和一個直觀的客戶端API。架構圖主要功能特徵:橫向擴充套件(每秒數百萬的獨立主題和訊息釋出) 非常強大的排序和一致性保證 低延遲持

安卓開發入門工具欄Action Bar

本文針對Android3.0及以上。主要是官方文件的翻譯加上我自己的理解。對應於官方文件的develop -- Training -- Adding the Action Bar 內容: 1.新增工具欄 2.新增工具欄按鈕     2.1 工具欄佈局     2.2新增工

Redis學習筆記——釋出訂閱瞭解即可

概述 程序間的一種訊息通訊模式:釋出者(pub)傳送訊息,訂閱者(sub)接收訊息。 瞭解即可,不會使用Redis做訊息中介軟體。 訂閱/釋出訊息圖 注:客戶端訂閱訊息。 注:伺服器傳送訊息給訂閱的

觀察者模式Publish/SubscribeC#實現

轉載自  1. 概述   有時被稱作釋出/訂閱模式,觀察者模式定義了一種一對多的依賴關係,讓多個觀察者物件同時監聽某一個主題物件。這個主題物件在狀態發生變化時,會通知所有觀察者物件,使它們能夠自動更新自己。 2. 解決的問題   將一個系統分割成一個一些類相互協作的

Redis的釋出/訂閱pub/sub

釋出訂閱(pub/sub)是一種訊息通訊模式,主要的目的是解耦訊息釋出者和訊息訂閱者之間的耦合,這點和設計模式中的觀察者模式比較相似。pub /sub不僅僅解決釋出者和訂閱者直接程式碼級別耦合也解決兩者在物理部署上的耦合。redis作為一個pub/sub server,在訂閱者和釋出者之間起到了訊息路由的功

Latex入門編輯器texmaker+texlive安裝

目錄: 前言 用word寫論文敲公式敲的有點兒煩,因此,上網搜尋了一下什麼工具比較好用來敲公式,發現了神器“tex” 但是,還需要編輯器呀。 TeX是由著名的電腦科學家Donald E. Knuth(高德納)發明的排版系統,利用TeX可以很容