1. 程式人生 > >RabbitMQ 官方NET教程(三)【釋出/訂閱】

RabbitMQ 官方NET教程(三)【釋出/訂閱】

上一篇部落格中,我們實現了工作佇列,並且我們的工作佇列中的一個任務只會發給一個工作者,除非某個工作者未完成任務意外被殺死,會轉發給另外的工作者。在這部分中,我們會做一些完全不同的事情 - 我們會向多個消費者傳遞資訊。這種模式被稱為“釋出/訂閱”。

為了說明這個模式,我們要建立一個簡單的日誌記錄系統。它將包括兩個程式 - 第一個將發出日誌訊息,第二個將接收並列印它們。

在我們的日誌系統中,每一個執行的接收者程式都會收到日誌。這樣我們就可以實現一個接收者將接收到的資料寫到硬碟上,與此同時,另一個接收者把接收到的訊息展現在螢幕上。

本質上來說,就是已釋出的日誌訊息將被廣播到所有接收者。

轉發器(Exchanges)

前面的部落格中我們主要的介紹都是傳送者傳送訊息給佇列,接收者從佇列接收訊息。下面我們會引入Exchanges,展示RabbitMQ的完整的訊息模型。

讓我們快速瞭解我們在以前的教程中介紹的內容:

生產者是傳送訊息的使用者應用程式。
佇列是儲存訊息的緩衝區。
消費者是接收訊息的使用者應用程式。

RabbitMQ中的訊息傳遞模型的核心思想是,生產者從不將任何訊息直接傳送到佇列。實際上,生產者通常甚至不知道是否將訊息傳遞到任何佇列。

相反,生產者只能將資訊傳送到exchange。交換是一件非常簡單的事情。一方面,它收到來自生產者的訊息,另一方將它們推送到佇列。交換機必須準確知道接收到的訊息如何處理。應該追加到特定佇列嗎?應該追加到很多佇列嗎?或者應該丟棄。其規則由exchange

型別定義。
這裡寫圖片描述
有幾種交換型別可用: direct, topic, headers 和fanout。 我們將重點關注最後一個 - fanout。 我們建立一個這種型別的exchange,並稱它為logs

channel.ExchangeDeclare("logs", "fanout");

fanout交換器非常簡單。 正如您可以從名稱猜出,它只是將所有收到的訊息廣播到所有知道的佇列。 這正是我們需要的記錄器。

列出交換機

要列出伺服器上的交換機,您可以執行有用的rabbitmqctl:

sudo rabbitmqctl list_exchanges

在這個列表中會有一些amq.*

交換和預設(未命名)交換。 這些是預設建立的,但是不太可能需要使用它們。

預設交換

在本教程的前面部分,我們對交換沒有任何瞭解,但仍然能夠將訊息傳送到佇列。 這是可能的,因為我們使用預設交換,我們通過空字串("")標識。

回想一下我們之前釋出的訊息:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                         routingKey: "hello",
                         basicProperties: null,
                         body: body);

第一個引數是交換的名稱。 空字串表示預設或無名交換:訊息通過路由Key指定的名稱路由到佇列(如果存在)。

現在,我們可以釋出到我們命名的交換機:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

臨時佇列(Temporary queues)

前面我們使用的是具有指定名稱的佇列(記得hellotask_queue?)。 能夠命名佇列對我們而言至關重要 - 我們需要將工作者指向同一個佇列。 當您想要在生產者和消費者之間共享佇列時,給佇列一個名字很重要。

但是我們的日誌系統我們並不關心佇列的名稱。 我們希望接收到所有的日誌訊息,而不僅僅是它們的一部分。 而且我們也只對當前正在傳遞的資料的感興趣。 要解決我們的需,要做兩件事情。

首先,每當我們連線到Rabbit,我們需要一個新的空的佇列。 為此,我們可以建立一個具有隨機名稱的佇列,或者甚至更好 - 讓伺服器為我們選擇一個隨機佇列名稱。

其次,一旦消費者與Rabbit斷開,佇列應該被自動刪除。

在.NET客戶端中,當我們沒有為queueDeclare()提供引數時,我們建立了一個不可持續的,獨佔的,自動刪除的佇列,其中包含一個生成的名稱:

var queueName = channel.QueueDeclare().QueueName;

此時,queueName包含一個隨機佇列名稱。 例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

繫結(Bindings)

這裡寫圖片描述

我們已經建立了一個扇出交換和一個佇列。 現在我們需要告訴交換機發送訊息到我們的佇列。 交換和佇列之間的關係稱為繫結(binding)。

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

從現在開始,logs交換器將追加訊息到我們的佇列。

列出繫結

你可以列出現有的繫結:

rabbitmqctl list_bindings

完整的例子

這裡寫圖片描述
發出日誌訊息的生產者程式與上一個教程並沒有太大的區別。 最重要的變化是我們現在想將訊息釋出到我們的logs交換器,而不是無名的。 傳送時需要提供一個routingKey,但是對於fanout交換來說,它的值被忽略。 這裡是EmitLog.cs檔案的程式碼:

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

如你所見,建立連線後,我們宣告交換。 此步驟是必須的,因為禁止釋出到不存在的交換機。

如果沒有任何佇列繫結到交換機,訊息將丟失,但是對我們來說沒關係; 如果沒有消費者正在收聽,我們可以放心地放棄資訊。

ReceiveLogs.cs的程式碼:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

按照教程一的安裝說明生成EmitLogsReceiveLogs專案。

如果要將日誌儲存到檔案,只需開啟控制檯並鍵入:

cd ReceiveLogs
dotnet run > logs_from_rabbit.log

如果您希望在螢幕上看到日誌,則產生一個新的終端並執行:

cd ReceiveLogs
dotnet run

當然,要發射日誌型別:

cd EmitLog
dotnet run

使用rabbitmqctl list_bindings,您可以驗證程式碼是否按照我們想要的方式建立繫結和佇列。 執行兩個ReceiveLogs.cs程式時,您應該看到如下所示:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

對結果的解釋很簡單:來自交換機logs的資料轉到具有伺服器分配名稱的兩個佇列。 這正是我們的意圖。

相關推薦

RabbitMQ 官方NET教程()釋出/訂閱

上一篇部落格中,我們實現了工作佇列,並且我們的工作佇列中的一個任務只會發給一個工作者,除非某個工作者未完成任務意外被殺死,會轉發給另外的工作者。在這部分中,我們會做一些完全不同的事情 - 我們會向多個消費者傳遞資訊。這種模式被稱為“釋出/訂閱”。 為了說明這個

RabbitMQ 官方NET教程(四)路由選擇

在上一個教程中,我們構建了一個簡單的日誌記錄系統。 我們能夠廣播日誌訊息給所有你的接收者。 在本教程中,我們將為其新增一個功能 - 我們將讓日誌接收者可以僅訂閱一部分訊息。 例如,我們將能夠僅將關鍵的錯誤訊息寫入到日誌檔案(以節省磁碟空間),同時仍然能夠在控制

RabbitMQ 官方NET教程(六)RPC

在第二個教程中,我們學習瞭如何使用Work Queues在多個工作者之間分配耗時的任務。 但是如果我們需要在遠端計算機上執行功能並等待結果怎麼辦? 那是一個不同的模式。 此模式通常稱為遠端過程呼叫或RPC。 在本教程中,我們將使用RabbitMQ構建一個RP

Kafka- 訊息佇列中點對點釋出訂閱區別

1.JMS中定義 JMS規範目前支援兩種訊息模型:點對點(point to point, queue)和釋出/訂閱(publish/subscribe,topic)。 點對點: 訊息生產者生產訊息傳送到queue中,然後訊息消費者從queue中取出並且消費訊息。這裡要注意: 訊息被消費以

RabbitMQ系列教程釋出\/訂閱(Publish\/Subscribe)

在前一個教程中,我們建立了一個工作佇列。工作佇列背後的假設是每個任務會被交付給一個【工人】。在這一部分我們將做一些完全不同的事情--我們將向多個【消費者】傳遞資訊。這種模式被稱為“釋出/訂閱”。   為了說明這種模式,我們將構建一個簡單的日誌系統。它將包括兩個程式,第一個將發

RabbitMQ釋出/訂閱

本系列教程主要來自於官網入門教程的翻譯,然後自己進行了部分的修改與實驗,內容僅供參考。  上一篇部落格中,我們實現了工作佇列,並且我們的工作佇列中的一個任務只會發給一個工作者,除非某個工作者未完成任務意外被殺死,會轉發給另外的工作者,如果你還不瞭解:RabbitMQ (

RabbitMQ指南之釋出/訂閱模式(Publish/Subscribe)

在上一章中,我們建立了一個工作佇列,工作佇列模式的設想是每一條訊息只會被轉發給一個消費者。本章將會講解完全不一樣的場景: 我們會把

設計模式()—— 釋出-訂閱模式

1. 釋出訂閱模式和觀察者模式很相似,但並不等同 釋出訂閱模式是一種訊息正規化,訊息傳送者(釋出者)不會將訊息直接傳送給特定的接受者(訂閱者),而是將釋出的訊息分為不同的類別,通過一箇中間的訊息代理來排程訊息,釋出者無需瞭解有哪些訂閱者存在。同樣的,訂閱者也只接受自己感興趣的那一類訊息,無需瞭解

RabbitMQ訊息分發模式----"Publish/Subscribe"釋出/訂閱模式

介紹 我們都是基於一個佇列傳送和接受訊息。  前面講的幾種,不管是生產者端還是消費者端都必須知道一個指定的QueueName才能傳送、獲取訊息。  而RabbitMQ訊息模型的核心思想是生產者不會將訊息直接傳送給佇列。現在介紹一下完整的訊息傳遞模式: 如果同一個訊息,

Redis釋出訂閱

Redis通過PUBLISH、SUBSCRIBE等命令實現釋出與訂閱模式。 舉例:QQ群的公告,單個釋出者,多個收聽著。 *** 釋出/訂閱 PUBLISH 頻道 訊息 將訊息釋出到指定的頻道。 . SUBSCRIBE 頻道1 [頻道2] [...] 可同時訂閱多個頻道。 . UNSUBSCR

分享一個分散式訊息匯流排,基於.NET Socket Tcp的釋出-訂閱框架,附程式碼下載

一、分散式訊息匯流排      在很多MIS專案之中都有這樣的需求,需要一個及時、高效的的通知機制,即比如當使用者A完成了任務X,就需要立即告知使用者B任務X已經完成,在通常的情況下,開發人中都是在使用者B所使用的程式之中寫資料庫輪循程式碼,這樣就會產品一個很嚴重的兩個問題,第一個問題是延遲,輪循機制要定時

分散式訊息匯流排,基於.NET Socket Tcp的釋出-訂閱框架之離線支援,附程式碼下載

一、分散式訊息匯流排以及基於Socket的實現      在前面的分享一個分散式訊息匯流排,基於.NET Socket Tcp的釋出-訂閱框架,附程式碼下載一文之中給大家分享和介紹了一個極其簡單也非常容易上的基於.NET Socket Tcp 技術實現的分佈訊息匯流排,也是一個簡單的釋出訂閱框架:  

RabbitMQ與.net core() fanout型別Exchange 與 訊息的過期時間 與 佇列的存活時間

原文: RabbitMQ與.net core(三) fanout型別Exchange 與 訊息的過期時間 與 佇列的存活時間 上一篇我們講了關於direct型別的Exchange,這一片我們來了解一下fanout型別的Exchange。 1.Exchange的fanout型別 fanout型別的Exch

訊息佇列-ActiveMQ學習筆記()-釋出-訂閱訊息模式實現

釋出-訂閱訊息模式與點對點模式類似,只不過在session建立訊息佇列時,由session.createQuene()變為session.createTopic()。 訊息釋出者程式碼: package com.feiyang.activemq2; import java

ActiveMQ入門系列釋出/訂閱模式

在上一篇《ActiveMQ入門系列二:入門程式碼例項(點對點模式)》中提到了ActiveMQ中的兩種模式:點對點模式(PTP)和釋出/訂閱模式(Pub & Sub),詳細介紹了點對點模式並用程式碼例項進行說明,今天就介紹下發布/訂閱模式。 一、理論基礎 釋出/訂閱模式的工作示意圖: 訊息

AutoMapper官方文件(二)升級指南

初始化 您現在必須使用Mapper.Initialize或new MapperConfiguration()來初始化AutoMapper。如果您希望保持靜態使用,請使用Mapper.Initialize。 如果你有很多的Mapper.CreateMap呼叫,把它們移動到一個Profile,或者Mapper

ABP官方文件(一)入門介紹

1.1 ABP總體介紹 - 入門介紹 ABP是“ASP.NET Boilerplate Project (ASP.NET樣板專案)”的簡稱。 ASP.NET Boilerplate是一個用最佳實踐和流行技術開發現代WEB應用程式的新起點,它旨在成為一個通用的

微信小程式--官方文件補充操作反饋--modal元件

modal彈出框常用在提示一些資訊比如:退出應用,清楚快取,修改資料提交時一些提示等等。 常用屬性: wxml <!--監聽button點選事件--><buttonbindtap="listenerButton"type="primary">彈出modal</but

ABP官方文件(五)多租戶

1.5 ABP總體介紹 - 多租戶 1.5.1 什麼是多租戶 維基百科:“軟體多租戶是指一個軟體架構的例項軟體執行在一個伺服器上,但存在多個租戶。租戶是一組共享一個公共的使用者訪問特定許可權的軟體例項。多租戶架構,軟體應用程式旨在提供每個租戶專用的例項包

從零開始學習ExtJs6系列教程Hello World

我們在學校裡學習任何一門語言都是從"Hello World"開始,這裡我們也不例外。 那麼我們的教程就從 Hello World 講起。 helloWorld.js Ext.onReady(func