1. 程式人生 > >[譯]RabbitMQ教程C#版 - 主題

[譯]RabbitMQ教程C#版 - 主題

原文: [譯]RabbitMQ教程C#版 - 主題

先決條件
本教程假定 RabbitMQ 已經安裝,並執行在localhost標準埠(5672)。如果你使用不同的主機、埠或證書,則需要調整連線設定。

從哪裡獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯絡我們

主題

(使用 .NET 客戶端)

教程[4] 中,我們改進了我們日誌系統。我們用direct交換器替換了只能呆滯廣播訊息的fanout交換器,從而可以有選擇性的接收日誌。

雖然使用direct交換器改進了我們的系統,但它仍然有侷限性 - 不能基於多個標準進行路由。

在我們的日誌系統中,我們可能不僅要根據日誌的嚴重性訂閱日誌,可能還要根據日誌分發源來訂閱日誌。或許您可能從 unix

syslog 工具中瞭解過這種概念,syslog 工具在路由日誌的時候是可以既基於嚴重性(info/warn/crit...)又基於裝置(auth/cron/kern...)的。

這種機制會給我們帶來極大的靈活性 - 我們可以僅監聽來自cron的關鍵錯誤日誌,與此同時,監聽來自kern的所有日誌。

要在我們的日誌系統中實現這一特性,我們需要學習更復雜的topic交換器。

Topic交換器

傳送到topic交換器的訊息不能隨意指定routing key,它必須是一個由點分割的單詞列表,這些單詞可以是任意內容,但通常會在其中指定一些與訊息相關的特性。請看一些合法的路由鍵示例:stock.usd.nyse

nyse.vmwquick.orange.rabbit,路由鍵可以包含任意數量的單詞,但不能超過255個位元組的上限。

binding key也必須是相同的形式,topic交換器的背後邏輯與direct交換器類似 - 使用指定路由鍵傳送的訊息會被分發到與其繫結鍵匹配的所有佇列中。不過對於繫結鍵來說,有兩個重要的特殊情況需要注意:

  • *(星號)可以代替一個單詞。
  • #(雜湊)可以代替零個或多個單詞。

下圖示例是對上述內容最簡單的解釋:

在這個示例中,我們打算髮送的訊息全是用來描述動物的,這些訊息會使用由三個單詞(兩個點)組成的路由鍵來發送。在路由鍵中,第一個單詞用來描述行動速度、第二個是顏色、第三個是物種,即:<speed>.<colour>.<species>

我們建立了三個繫結:Q1綁定了鍵.orange.,Q2綁定了鍵*.*.rabbitlazy.#

這些繫結可以被概括為:

  • Q1對所有橙色的動物感興趣。
  • Q2對兔子以及所有行動緩慢的動物感興趣。

路由鍵為quick.orange.rabbit的訊息會被髮送到這兩個佇列,訊息lazy.orange.elephant也會被髮送到這兩個佇列。另外,quick.orange.fox只會進入第一個佇列,lazy.brown.fox只會進入第二個佇列。lazy.pink.rabbit只會被髮送到第二個佇列一次,儘管它匹配了兩個繫結(避免了訊息重複)。quick.brown.fox沒有匹配的繫結,因此它將會被丟棄。

如果我們打破約定,傳送使用一個或四個單詞(例如:orangequick.orange.male.rabbit)作路由鍵的訊息會發生什麼?答案是,這些訊息因為沒有匹配到任何繫結,將被丟棄。

但是,另外,例如路由鍵為lazy.orange.male.rabbit的訊息,儘管它有四個單詞,也會匹配最後一個繫結,並將被髮送到第二個佇列。

Topics 交換器
topic交換器的功能是很強大的,它可以表現出一些其他交換器的行為。
當一個佇列與鍵(雜湊)繫結時, 它會忽略路由鍵,接收所有訊息,這就像fanout交換器一樣。
當特殊字元*(星號)和(雜湊)未在繫結中使用時,topic交換器的行為就像direct交換器一樣。

組合在一起

我們將要在我們的日誌系統中使用topic交換器,首先假設日誌的路由鍵有兩個單片語成:<facility>.<severity>

程式碼與上一篇 教程 中的程式碼幾乎相同。

EmitLogTopic.cs的程式碼:

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class EmitLogTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs",
                                    type: "topic");

            var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
            
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip(1).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            
            channel.BasicPublish(exchange: "topic_logs",
                                 routingKey: routingKey,
                                 basicProperties: null,
                                 body: body);
                                 
            Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
        }
    }
}

ReceiveLogsTopic.cs的程式碼:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogsTopic
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [binding_key...]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var bindingKey in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "topic_logs",
                                  routingKey: bindingKey);
            }

            Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey,
                                  message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

請執行以下示例:

要接收所有日誌:

cd ReceiveLogsTopic
dotnet run "#"

要接收來自裝置kern的所有日誌:

cd ReceiveLogsTopic
dotnet run "kern.*"

或者,如果您只想監聽級別為critical的日誌:

cd ReceiveLogsTopic
dotnet run "*.critical"

您可以建立多個繫結:

cd ReceiveLogsTopic
dotnet run "kern.*" "*.critical"

使用路由鍵kern.critical發出日誌:

cd EmitLogTopic
dotnet run "kern.critical" "A critical kernel error"

希望執行這些程式能讓您玩得開心。要注意的是,這些程式碼沒有針對路由鍵和繫結鍵做任何預設,您可以嘗試使用兩個以上的路由鍵引數。

EmitLogTopic.csReceiveLogsTopic.cs 的完整原始碼)

接下來,在 教程[6] 中將瞭解如何將往返訊息作為遠端過程呼叫。

寫在最後

本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容為準。水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。

  • 原文連結:RabbitMQ tutorial - Topics
  • 實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
  • 最後更新:2018-09-06