RabbitMQ 官方NET教程(四)【路由選擇】
在上一個教程中,我們構建了一個簡單的日誌記錄系統。 我們能夠廣播日誌訊息給所有你的接收者。
在本教程中,我們將為其新增一個功能 - 我們將讓日誌接收者可以僅訂閱一部分訊息。 例如,我們將能夠僅將關鍵的錯誤訊息寫入到日誌檔案(以節省磁碟空間),同時仍然能夠在控制檯上列印所有日誌訊息。
繫結(Bindings)
在以前的例子中,我們已經使用過繫結。類似下面的程式碼:
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
繫結表示轉發器與佇列之間的關係。我們也可以簡單的認為:佇列對該轉發器上的訊息感興趣。
繫結可以附帶一個額外的引數routingKey。 為了避免與BasicPublish
引數混淆,我們將其稱為binding key
。 這就是我們如何用一個鍵建立一個繫結:
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: "black");
繫結鍵的意義依賴於轉發器的型別。對於fanout型別,忽略此引數。
直接轉發(Direct exchange)
我們從上一個教程的日誌記錄系統向所有消費者廣播所有訊息。 我們希望將其擴充套件為允許基於其嚴重性進行過濾日誌訊息。 例如,我們可能希望將日誌訊息寫入磁碟的指令碼僅接收嚴重錯誤,而不會在警告或資訊日誌訊息上浪費磁碟空間。
我們正在使用一個fanout
的交換機,它不給我們很大的靈活性 - 它只能無意識地轉發。
我們會使用direct
轉發器。 direct
型別的轉發器背後的路由演算法很簡單 - 訊息傳遞到binding key
與訊息的routing key
完全匹配的佇列。
為了說明,請考慮以下設定:
在這個設定中,我們可以看到direct
型別的轉發器X
與兩個佇列繫結。 第一個佇列與繫結鍵orange
繫結,第二個佇列與轉發器間有兩個繫結,一個與繫結鍵black
繫結,另一個與green
繫結鍵繫結。
在這樣的設定中,釋出附帶一個選擇鍵(routing key) orange
的訊息至交換機,將被導向到佇列Q1。 訊息附帶一個選擇鍵 (routing key)black
green
將會被導向到Q2。 所有其他訊息將被丟棄。
多重繫結(multiple bindings)
使用相同的繫結鍵繫結多個佇列是完全合法的。 在我們的示例中,我們可以在X
和Q1
之間新增繫結鍵black
。 在這種情況下,direct
交換將表現得像fanout
,並將訊息廣播到所有匹配的佇列。 附帶選擇鍵black
的訊息將傳送到Q1和Q2。
傳送日誌(Emittinglogs)
我們將此模型用於日誌記錄系統。我們將訊息傳送到direct
型別的轉發器而不是fanout
型別。這樣的話, 接收程式可以根據嚴重性來選擇接收。 我們首先關注傳送日誌的程式碼:
一如以往,我們需要先建立一個轉發器:
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
然後我們準備傳送一條訊息:
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
為了簡化程式碼,我們假定severity
是info
,warning
,error
中的一個。
訂閱
接收訊息將像上一個教程類似,只有一點不同 - 我們將為每個我們感興趣的嚴重性型別的日誌建立一個新的繫結。
var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
完整的例項
EmitLogDirect.cs 類的程式碼:
using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class EmitLogDirect
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip( 1 ).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
ReceiveLogsDirect.cs的程式碼:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogsDirect
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
var queueName = channel.QueueDeclare().QueueName;
if(args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey, message);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
如果您只想將warning
和error
(而不是info
)儲存到檔案中,只需開啟控制檯並鍵入:
cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log
如果您想檢視螢幕上的所有日誌訊息,請開啟一個新終端,然後執行以下操作:
cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C
而且,例如,要發出error
日誌訊息,只需鍵入:
cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'