1. 程式人生 > 程式設計 >NServiceBus+RabbitMQ開發分散式應用

NServiceBus+RabbitMQ開發分散式應用

前言

NServiceBus提供了8種傳輸管道元件,分別是Learning、MSMQ、Azure Service Bus、Azure Service Bus (Legacy)、Azure Storage Queues、SQL Server、RabbitMQ、Amazon SQS。前兩篇我們主要用的是Learnning,這篇使用RabbitMQ,也可以直接用於生產。

安裝RabbitMQ元件

RabbitMQ不在NServiceBus下,是一個單獨的元件,需要單獨安裝。

QQ截圖20191025114539.jpg

設定RabbitMQ連線串

var transport = config.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"
); transport.UseDirectRoutingTopology(); 複製程式碼

訊息生產者(ClientUI)

 class Program
    {
        static ILog log = LogManager.GetLogger<Program>();
        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }

        static async Task MainAsync()
        
{ Console.Title = "Sample.ClientUI"; var config = new EndpointConfiguration("Sample.ClientUI"); config.UseSerialization<NewtonsoftSerializer>(); config.UsePersistence<InMemoryPersistence>(); config.EnableInstallers(); var
transport = config.UseTransport<RabbitMQTransport>(); transport.ConnectionString("host=192.168.80.129;username=admin;password=admin"); transport.UseDirectRoutingTopology(); var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false); await RunAsync(endpointInstance).ConfigureAwait(false); await endpointInstance.Stop().ConfigureAwait(false); } static async Task RunAsync(IEndpointInstance endpointInstance) { log.Info("Press 'P' to send an PlaceOrder Command"); while (true) { var key = Console.ReadKey(); Console.WriteLine(); switch (key.Key) { case ConsoleKey.P: { var command = new PlaceOrder { OrderId = Guid.NewGuid().ToString() }; log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}"); await endpointInstance.Send("Sample.Server",command).ConfigureAwait(false); break; } case ConsoleKey.Q: return; default: log.Info("Please try again"); break; } } } } 複製程式碼

先啟動ClientUI,啟動後先傳送幾條資料,傳送資料觀察程式控制臺資料是否傳送,然後在RabbitMQ控制檯裡觀察Queue的情況,沒建立Queue時,這裡會自動建立Queue,佇列名稱和端點名稱相同。這裡傳送訊息資料,資料會在Sample.Server佇列裡。

QQ截圖20191025114228.jpg

QQ截圖20191025114115.jpg

QQ截圖20191025114135.jpg
     這裡能看到在佇列Sample.Server裡有三條未消費的資料,因為我還沒有啟動Sample.Server控制檯程式。

訊息消費者(Server)

public class Program
{
    static ILog log = LogManager.GetLogger<Program>();
    static void Main(string[] args)
    {
        MainAsync().GetAwaiter().GetResult();
    }


    static async Task MainAsync()
    {
        Console.Title = "Sample.Server";
        var config = new EndpointConfiguration("Sample.Server");
        config.UseSerialization<NewtonsoftSerializer>();
        config.UsePersistence<InMemoryPersistence>();
        config.EnableInstallers();

        var transport = config.UseTransport<RabbitMQTransport>();
        transport.ConnectionString("host=192.168.80.129;username=admin;password=admin");
        transport.UseDirectRoutingTopology();

        var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);

        log.Info("Press any key to quit");
        Console.ReadKey();

        await endpointInstance.Stop().ConfigureAwait(false);
    }
}
複製程式碼
public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
{
    static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
    public Task Handle(PlaceOrder message,IMessageHandlerContext context)
    {
        log.Info($"Received PlaceOrder with OrderId:{message.OrderId}");
        return Task.CompletedTask;
    }
}
複製程式碼

啟動Sample.Server控制檯後,就會消費Queue裡的資料,消費完成後觀察程式控制臺的提示和RabbitMQ控制檯的提示。

QQ截圖20191025114221.jpg

QQ截圖20191025114245.jpg
QQ截圖20191025114259.jpg
    這裡能看到剛才堆積的三條資料已經被消費了。

總結

寫完這個demo我就在想在程式裡通過使用NServiceBus的RabbitMQ元件和直接在程式裡使用RabbitMQ直接呼叫的區別,到底有沒有必要通過NServiceBus呼叫,還需要進一步考量。