【c#】RabbitMQ學習文檔(四)Routing(路由)
(使用Net客戶端)
在上一個教程中,我們構建了一個簡單的日誌系統,我們能夠向許多消息接受者廣播發送日誌消息。
在本教程中,我們將為其添加一項功能 ,這個功能是我們將只訂閱消息的一個子集成為可能。 例如,我們可以只將關鍵的錯誤消息輸出到日誌文件(以節省磁盤空間),同時仍然可以在控制臺上打印所有日誌消息。
1、綁定
在以前的例子中,我們已經創建了綁定。 你可能會記得如下代碼:
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
【綁定】是【消息交換機】和【隊列】之間的關系紐帶,通過綁定把二者關聯起來。 這可以簡單地理解為:隊列可以接收來自此【消息交換機】的消息。
【綁定】可以占用額外的路由選擇參數。 為了避免與BasicPublish參數混淆,我們將其稱為【綁定鍵】。 以下代碼就是如何用一個鍵值來創建一個綁定:
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "black");
【綁定鍵】的含義取決於交換類型。 以前我們使用的【Fanout】類型的【消息交換機】忽略了它的取值。
2、直接交換
在上一個教程中,我們的日誌記錄系統向所有【消費者】發送所有消息。 我們希望將其擴展為允許基於其嚴重性過濾消息。 例如,我們可能希望將寫入磁盤日誌消息的腳本僅接受嚴重錯誤,而不會在警告或信息日誌消息上浪費磁盤空間。
我們正在使用一個【Fanout】類型的【消息交換機】,它不會給我們帶來很大的靈活性 - 它只能無意識地發送。
我們將使用一個【Direct】類型的【消息交換機】。 直接轉換路由的背後的算法其實是很簡單的 - 把消息傳遞到【綁定鍵 binding key
為了說明,請考慮以下設置:
在這個設置中,我們可以看到【Direct】類型的【消息交換機】X與兩個隊列相綁定。 第一個隊列與【綁定鍵】的值是Orange相綁定的,第二個隊列有兩個綁定,一個【綁定鍵】的值是black,另一個【綁定鍵】的值是green。
在這樣的設置中,發布到具有【路由鍵】為orange的【消息交換機】的消息將被路由到隊列Q1。 具有black或green【路由鍵】的消息將轉到Q2。 所有其他消息將被丟棄。
3、多重綁定
使用相同的【綁定鍵】綁定多個隊列是完全合法的。 在我們的示例中,我們可以在X和Q1之間添加【綁定鍵】是black的綁定。 在這種情況下,【direct】類型的【消息交換機】將表現得像【Fanout】類型的【消息交換機】,並將消息發送到所有匹配的隊列。 具有【路由鍵】是black的消息將傳送到Q1和Q2。
4、發出日誌
我們將發送消息到【Direct】類型的【消息交換機】來替換【fanout】類型的【消息交換機】,在我們現在的日誌系統將使用此模型。 我們將提供日誌嚴重性作為【路由鍵】。 這樣接收腳本就能夠選擇想要接收的嚴重性。 我們首先關註發出日誌。
像以前一樣,我們首先要建立一個【消息交換機】:
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
現在,我們準備發送消息:
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body);
為了簡化事情,我們假設“嚴重性”可以是“信息”,“警告”,“錯誤”之一。
5、訂閱
接收消息將像上一個教程一樣工作,除了一個例外 - 我們將為每個我們感興趣的嚴重性創建一個新的綁定。
var queueName = channel.QueueDeclare().QueueName; foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); }
6、整合
以下是EmitLogDirect.cs類的代碼:
1 using System; 2 using System.Linq; 3 using RabbitMQ.Client; 4 using System.Text; 5 6 class EmitLogDirect 7 { 8 public static void Main(string[] args) 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "direct_logs", 15 type: "direct"); 16 17 var severity = (args.Length > 0) ? args[0] : "info"; 18 var message = (args.Length > 1) 19 ? string.Join(" ", args.Skip( 1 ).ToArray()) 20 : "Hello World!"; 21 var body = Encoding.UTF8.GetBytes(message); 22 channel.BasicPublish(exchange: "direct_logs", 23 routingKey: severity, 24 basicProperties: null, 25 body: body); 26 Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", severity, message); 27 } 28 29 Console.WriteLine(" Press [enter] to exit."); 30 Console.ReadLine(); 31 } 32 }
以下是ReceiveLogsDirect.cs類的代碼:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class ReceiveLogsDirect 7 { 8 public static void Main(string[] args) 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "direct_logs", 15 type: "direct"); 16 var queueName = channel.QueueDeclare().QueueName; 17 18 if(args.Length < 1) 19 { 20 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", 21 Environment.GetCommandLineArgs()[0]); 22 Console.WriteLine(" Press [enter] to exit."); 23 Console.ReadLine(); 24 Environment.ExitCode = 1; 25 return; 26 } 27 28 foreach(var severity in args) 29 { 30 channel.QueueBind(queue: queueName, 31 exchange: "direct_logs", 32 routingKey: severity); 33 } 34 35 Console.WriteLine(" [*] Waiting for messages."); 36 37 var consumer = new EventingBasicConsumer(channel); 38 consumer.Received += (model, ea) => 39 { 40 var body = ea.Body; 41 var message = Encoding.UTF8.GetString(body); 42 var routingKey = ea.RoutingKey; 43 Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", 44 routingKey, message); 45 }; 46 channel.BasicConsume(queue: queueName, 47 noAck: true, 48 consumer: consumer); 49 50 Console.WriteLine(" Press [enter] to exit."); 51 Console.ReadLine(); 52 } 53 } 54 }
如果您只想將“警告”和“錯誤”(而不是“信息”)保存到文件中,只需打開控制臺並鍵入:
cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log
如果您想查看屏幕上的所有日誌消息,請打開一個新終端,然後執行以下操作:
cd ReceiveLogsDirect dotnet run info warning error # => [*] Waiting for logs. To exit press CTRL+C
而且,例如,要發出錯誤日誌消息,只需鍵入:
cd EmitLogDirect dotnet run error "Run. Run. Or it will explode." # => [x] Sent ‘error‘:‘Run. Run. Or it will explode.‘
以下是原文地址:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
今天這篇文章終於翻譯完了,整個系列還有幾篇沒翻譯。英文水平有限,錯誤在所難免,歡迎大家提出來,共同學習。
【c#】RabbitMQ學習文檔(四)Routing(路由)