1. 程式人生 > >RabbitMQ學習(5) (主題)

RabbitMQ學習(5) (主題)

bytes 關於 字符 .exe war pytho 局限 一起 chan

我們改進了我們的日誌系統 我們沒有使用只有虛擬廣播fanout 交換機,而是使用 direct 交換機,並有選擇性地接收日誌的可能性。

盡管使用直接交換改進了我們的系統,但它仍然有局限性 - 它不能根據多個標準進行路由選擇。

在我們的日誌系統中,我們可能不僅要根據嚴重性來訂閱日誌,還要根據發出日誌的來源進行訂閱。你可能從syslog unix工具知道這個概念 ,它根據嚴重性(info / warn / crit ...)和facility(auth / cron / kern ...)來路由日誌。

這會給我們很大的靈活性 - 我們可能想要聽取來自‘cron‘的嚴重錯誤,而且還要聽取來自‘kern‘的所有日誌。

為了在我們的日誌系統中實現這一點,我們需要了解更復雜的話題交換。

話題交換

發送到主題交換的消息不能有任意的 routing_key - 它必須是由點分隔的單詞列表。這些單詞可以是任何東西,但通常它們指定連接到消息的一些功能。一些有效的路由鍵例子:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。在路由選擇鍵中可以有任意數量的字,最多255個字節。

綁定鍵也必須是相同的形式。主題交換背後的邏輯 類似於直接的 - 使用特定的路由密鑰發送的消息將被傳送到與匹配的綁定密鑰綁定的所有隊列。

但是綁定鍵有兩個重要的特殊情況:

  • *(星號)可以代替一個字。
  • (散列)可以代替零個或多個單詞。

在一個例子中解釋這個很簡單:

技術分享圖片

在這個例子中,我們將發送所有描述動物的信息。消息將使用由三個字(兩個點)組成的路由鍵發送。路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:“ <speed>。<color>。<species>”。

我們創建了三個綁定:Q1綁定了綁定鍵“ * .orange。* ”,Q2 綁定了“ *。*。rabbit ”和“ lazy。# ”。

這些綁定可以概括為:

  • Q1對所有的橙色動物感興趣。
  • Q2希望聽到有關兔子的一切,以及關於懶惰動物的一切。

將路由鍵設置為“ quick.orange.rabbit ”的消息將傳遞到兩個隊列。消息“ lazy.orange.elephant ”也將去他們兩個。另一方面,“ quick.orange.fox ”只會到第一個隊列,而“ lazy.brown.fox ”只會到第二個隊列即使匹配兩個綁定,lazy.pink.rabbit ”也只會被傳遞到第二個隊列一次。quick.brown.fox”不匹配任何綁定,因此將被丟棄。

如果我們違反我們的合同,並發送一個或四個單詞,如“ 橙色 ”或“ quick.orange.male.rabbit的消息會發生什麽那麽,這些消息將不匹配任何綁定,將會丟失。

另一方面,“ lazy.orange.male.rabbit ”即使有四個單詞,也會匹配最後一個綁定,並被傳遞到第二個隊列。

話題交換

話題交換功能強大,可以像其他交流一樣行事。

當一個隊列綁定了“ ”(哈希)綁定密鑰時,它將接收所有的消息,而不管路由密鑰如何在扇出交換。

在綁定中不使用特殊字符“ * ”(星號)和“ ”(散列)時,主題交換將像直接交換一樣

把它放在一起

我們將在我們的日誌系統中使用話題交換。我們首先假定日誌的路由鍵有兩個字:“ <facility>。<severity>

EmitLogTopic.cs的代碼

 public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs",
                                    type: "topic");

            var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip( 1 ).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "topic_logs",
                                 routingKey: routingKey,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", routingKey, message);
        }
    }

ReceiveLogsTopic.cs的代碼

 public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [binding_key...]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var bindingKey in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "topic_logs",
                                  routingKey: bindingKey);
            }

            Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘",
                                  routingKey,
                                  message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

要接收所有日誌:

cd ReceiveLogsTopic
dotnet run "#"

要接收來自設施“ kern ”的所有日誌

cd ReceiveLogsTopic
dotnet run "kern.*"

或者如果你只想聽到關於“ 關鍵 ”日誌:

ReceiveLogsTopic.exe "*.critical"

您可以創建多個綁定:

cd ReceiveLogsTopic
dotnet run "kern.*" "*.critical"

並發送一個路由鍵“ kern.critical ”類型的日誌

cd EmitLogTopic
dotnet run "kern.critical" "A critical kernel error"

玩這些程序玩得開心。請註意,代碼不會對路由或綁定鍵作任何假設,您可能需要使用兩個以上的路由鍵參數。

RabbitMQ學習(5) (主題)