NServiceBus+RabbitMQ開發分散式應用
阿新 • • 發佈:2019-12-31
前言
NServiceBus提供了8種傳輸管道元件,分別是Learning、MSMQ、Azure Service Bus、Azure Service Bus (Legacy)、Azure Storage Queues、SQL Server、RabbitMQ、Amazon SQS。前兩篇我們主要用的是Learnning,這篇使用RabbitMQ,也可以直接用於生產。
安裝RabbitMQ元件
RabbitMQ不在NServiceBus下,是一個單獨的元件,需要單獨安裝。
設定RabbitMQ連線串
var transport = config.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=192.168.80.129;username=admin;password=admin" );
transport.UseDirectRoutingTopology();
複製程式碼
訊息生產者(ClientUI)
class Program
{
static ILog log = LogManager.GetLogger<Program>();
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
Console.Title = "Sample.ClientUI";
var config = new EndpointConfiguration("Sample.ClientUI");
config.UseSerialization<NewtonsoftSerializer>();
config.UsePersistence<InMemoryPersistence>();
config.EnableInstallers();
var transport = config.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
transport.UseDirectRoutingTopology();
var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);
await RunAsync(endpointInstance).ConfigureAwait(false);
await endpointInstance.Stop().ConfigureAwait(false);
}
static async Task RunAsync(IEndpointInstance endpointInstance)
{
log.Info("Press 'P' to send an PlaceOrder Command");
while (true)
{
var key = Console.ReadKey();
Console.WriteLine();
switch (key.Key)
{
case ConsoleKey.P:
{
var command = new PlaceOrder
{
OrderId = Guid.NewGuid().ToString()
};
log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");
await endpointInstance.Send("Sample.Server",command).ConfigureAwait(false);
break;
}
case ConsoleKey.Q:
return;
default:
log.Info("Please try again");
break;
}
}
}
}
複製程式碼
先啟動ClientUI,啟動後先傳送幾條資料,傳送資料觀察程式控制臺資料是否傳送,然後在RabbitMQ控制檯裡觀察Queue的情況,沒建立Queue時,這裡會自動建立Queue,佇列名稱和端點名稱相同。這裡傳送訊息資料,資料會在Sample.Server佇列裡。
這裡能看到在佇列Sample.Server裡有三條未消費的資料,因為我還沒有啟動Sample.Server控制檯程式。訊息消費者(Server)
public class Program
{
static ILog log = LogManager.GetLogger<Program>();
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task MainAsync()
{
Console.Title = "Sample.Server";
var config = new EndpointConfiguration("Sample.Server");
config.UseSerialization<NewtonsoftSerializer>();
config.UsePersistence<InMemoryPersistence>();
config.EnableInstallers();
var transport = config.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
transport.UseDirectRoutingTopology();
var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);
log.Info("Press any key to quit");
Console.ReadKey();
await endpointInstance.Stop().ConfigureAwait(false);
}
}
複製程式碼
public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
{
static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
public Task Handle(PlaceOrder message,IMessageHandlerContext context)
{
log.Info($"Received PlaceOrder with OrderId:{message.OrderId}");
return Task.CompletedTask;
}
}
複製程式碼
啟動Sample.Server控制檯後,就會消費Queue裡的資料,消費完成後觀察程式控制臺的提示和RabbitMQ控制檯的提示。
這裡能看到剛才堆積的三條資料已經被消費了。總結
寫完這個demo我就在想在程式裡通過使用NServiceBus的RabbitMQ元件和直接在程式裡使用RabbitMQ直接呼叫的區別,到底有沒有必要通過NServiceBus呼叫,還需要進一步考量。