NET 5 使用RabbitMQ
生產者
在Startup的ConfigureServices方法中新增相關rabbitmq的服務:
public void ConfigureServices(IServiceCollection services) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "View Code123456"; string virtualHost = "/"; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; #region 日誌記錄 services.AddRabbitLogger(options => { options.Hosts = hosts; options.Password = password; options.Port= port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.Category = "Home"; options.MinLevel = LogLevel.Warning;//佇列模式 options.Queue = "queue.logger"; //交換機模式 //options.Exchange = "exchange.logger"; //options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.logger", Route = "#" } }; //options.Type = RabbitExchangeType.Topic; }); #endregion #region 普通模式 services.AddRabbitProducer("SimplePattern", options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.InitializeCount = 3; options.Queues = new string[] { "queue.simple" }; }); #endregion #region 工作模式 services.AddRabbitProducer("WorkerPattern", options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.InitializeCount = 3; options.Queues = new string[] { "queue.worker" }; }); #endregion #region 釋出訂閱模式 services.AddRabbitProducer("FanoutPattern", options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.InitializeCount = 3; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.fanout1" }, new RouteQueue() { Queue = "queue.fanout2" } }; options.Type = RabbitExchangeType.Fanout; options.Exchange = "exchange.fanout"; }); #endregion #region 路由模式 services.AddRabbitProducer("DirectPattern", options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.InitializeCount = 5; options.Exchange = "exchange.direct"; options.Type = RabbitExchangeType.Direct; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.direct1", Route = "direct1" }, new RouteQueue() { Queue = "queue.direct2", Route = "direct2" } }; }); #endregion #region 主題模式 services.AddRabbitProducer("TopicPattern", options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.InitializeCount = 5; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.topic1", Route = "topic1.#" }, new RouteQueue() { Queue = "queue.topic2", Route = "topic2.#" } }; options.Type = RabbitExchangeType.Topic; options.Exchange = "exchange.topic"; }); #endregion services.AddControllers(); }
裡面介紹了6中整合方式:
使用AddRabbitLogger方法新增日誌相關的服務,需要注意的是,資料是以json格式傳送到rabbitmq中去的,具體可以參考RabbitLoggerMessage<T>類,最好是自己釋出測試就可以了,當然讀者可以安裝自己的需求修改RabbitLogger類中的釋出邏輯。
使用AddRabbitProducer方法新增一個釋出者,可以指定名稱,這個名稱是獲取釋出者物件時使用。這個方法新增的釋出者可以滿足rabbitmq的五種使用方式(普通模式,工作模式,釋出訂閱模式,路由模式,主題模式),具體由RabbitProducerOptions配置指定。
服務配置好,具體使用可以參考HomeController,日誌記錄可以注入ILogger<T>物件,或者注入ILoggerFactory物件,然後獲取ILogger<T>物件,直接使用ILogger<T>物件的方法就是釋出訊息了:
/// <summary> /// 日誌 /// </summary> /// <returns></returns> [HttpGet] public string Get() { logger.LogDebug("Debug"); logger.LogInformation("Information"); logger.LogWarning("Warning"); logger.LogError("Error"); return "success"; }
至於另外五種模式,我們需要注入IRabbitProducerFactory物件,然後使用Create方法建立指定名稱的釋出者,然後呼叫Publish或者PublishAsync方法釋出訊息,而且他們都有幾個過載。
需要注意的是,不同型別的生產者應該使用不同的Publish或者PublishAsync方法,比如普通模式和工作模式,因為他們沒有路由引數,因此需要使用無路由引數的Publish方法,如:
/// <summary> /// Simple /// </summary> /// <returns></returns> [HttpGet("Simple")] public string Simple(string message = "Simple") { var producer = rabbitProducerFactory.Create("SimplePattern"); producer.Publish(message); return "success"; } /// <summary> /// Worker /// </summary> /// <param name="message"></param> /// <returns></returns> [HttpGet("Worker")] public string Worker(string message = "Worker") { var producer = rabbitProducerFactory.Create("WorkerPattern"); int count = 10; while (count-- > 0) { producer.Publish(message); } return "success"; }
而釋出訂閱模式、路由模式、主題模式都是有路由的(釋出訂閱模式的路由可以認為是空值),因此需要使用帶路由引數的Publish方法:
/// <summary> /// Direct /// </summary> /// <param name="route"></param> /// <param name="message"></param> /// <returns></returns> [HttpGet("Direct")] public string Direct(string route = "direct1", string message = "Direct") { var producer = rabbitProducerFactory.Create("DirectPattern"); producer.Publish(route, message); return "success"; } /// <summary> /// Fanout /// </summary> /// <param name="message"></param> /// <returns></returns> [HttpGet("Fanout")] public string Fanout(string message = "Fanout") { var producer = rabbitProducerFactory.Create("FanoutPattern"); producer.Publish("", message);//fanout模式路由為空值 return "success"; } /// <summary> /// Topic /// </summary> /// <param name="route"></param> /// <param name="message"></param> /// <returns></returns> [HttpGet("Topic")] public string Topic(string route = "topic1.a", string message = "Topic") { var producer = rabbitProducerFactory.Create("TopicPattern"); producer.Publish(route, message); return "success"; }View Code
消費者
生產者和消費者不在同一個專案中,同樣的,需要先在Startup的ConfigureServices方法中新增服務:
public void ConfigureServices(IServiceCollection services) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; #region 日誌記錄 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; //options.AutoAck = true; //options.FetchCount = 10; //options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.logger", Route = "#" } };//交換機模式 //options.Type = RabbitExchangeType.Topic;//交換機模式 }) //.AddListener("queue.logger", result => //{ // Console.WriteLine("Message From queue.logger:" + result.Body); //}); .AddListener<RabbitConsumerListener>("queue.logger"); //.AddListener("exchange.logger", "queue.logger", result => //{ // Console.WriteLine("Message From queue.logger:" + result.Body); //});//交換機模式 #endregion #region 普通模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; //options.FetchCount = 1; options.AutoAck = false; }).AddListener("queue.simple", result => { Console.WriteLine("Message From queue.simple:" + result.Body); result.Commit(); }); #endregion #region 工作模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; //options.FetchCount = 2; options.AutoAck = false; }).AddListener("queue.worker", result => { Console.WriteLine("Message From queue.worker1:" + result.Body); result.Commit(); }).AddListener("queue.worker", result => { Console.WriteLine("Message From queue.worker2:" + result.Body); result.Commit(); }); #endregion #region 釋出訂閱模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.FetchCount = 2; options.AutoAck = false; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.fanout1" }, new RouteQueue() { Queue = "queue.fanout2" } }; options.Type = RabbitExchangeType.Fanout; }).AddListener("exchange.fanout", "queue.fanout1", result => { Console.WriteLine("Message From queue.fanout1:" + result.Body); result.Commit(); }).AddListener("exchange.fanout", "queue.fanout2", result => { Console.WriteLine("Message From queue.fanout2:" + result.Body); result.Commit(); }); #endregion #region 路由模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.AutoAck = false; options.FetchCount = 2; options.Type = RabbitExchangeType.Direct; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.direct1", Route = "direct1" }, new RouteQueue() { Queue = "queue.direct2", Route = "direct2" } }; }).AddListener("exchange.direct", "queue.direct1", result => { Console.WriteLine("Message From queue.direct1:" + result.Body); result.Commit(); }).AddListener("exchange.direct", "queue.direct2", result => { Console.WriteLine("Message From queue.direct2:" + result.Body); result.Commit(); }); #endregion #region 主題模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.FetchCount = 2; options.AutoAck = false; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.topic1", Route = "topic1.#" }, new RouteQueue() { Queue = "queue.topic2", Route = "topic2.#" } }; options.Type = RabbitExchangeType.Topic; }).AddListener("exchange.topic", "queue.topic1", result => { Console.WriteLine("Message From queue.topic1:" + result.Body); result.Commit(); }).AddListener("exchange.topic", "queue.topic2", result => { Console.WriteLine("Message From queue.topic2:" + result.Body); result.Commit(); }); #endregion services.AddControllers(); }View Code
無論是日誌的消費,還是其他五種模式的消費,都是先使用AddRabbitConsumer方法獲取到一個IRabbitConsumerBuilder消費者構造物件,然後它的通過AddListener方法新增訊息處理程式。
同樣的,需要注意的是,普通模式和工作模式是不基於交換機的策略模式,因此需要使用不包含交換機引數的AddListener方法:
#region 普通模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; //options.FetchCount = 1; options.AutoAck = false; }).AddListener("queue.simple", result => { Console.WriteLine("Message From queue.simple:" + result.Body); result.Commit(); }); #endregion #region 工作模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; //options.FetchCount = 2; options.AutoAck = false; }).AddListener("queue.worker", result => { Console.WriteLine("Message From queue.worker1:" + result.Body); result.Commit(); }).AddListener("queue.worker", result => { Console.WriteLine("Message From queue.worker2:" + result.Body); result.Commit(); }); #endregionView Code
釋出訂閱模式、路由模式和主題模式都是基於交換機的策略模式,因此使用需要交換機引數的AddListener方法:
#region 釋出訂閱模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.FetchCount = 2; options.AutoAck = false; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.fanout1" }, new RouteQueue() { Queue = "queue.fanout2" } }; options.Type = RabbitExchangeType.Fanout; }).AddListener("exchange.fanout", "queue.fanout1", result => { Console.WriteLine("Message From queue.fanout1:" + result.Body); result.Commit(); }).AddListener("exchange.fanout", "queue.fanout2", result => { Console.WriteLine("Message From queue.fanout2:" + result.Body); result.Commit(); }); #endregion #region 路由模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.AutoAck = false; options.FetchCount = 2; options.Type = RabbitExchangeType.Direct; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.direct1", Route = "direct1" }, new RouteQueue() { Queue = "queue.direct2", Route = "direct2" } }; }).AddListener("exchange.direct", "queue.direct1", result => { Console.WriteLine("Message From queue.direct1:" + result.Body); result.Commit(); }).AddListener("exchange.direct", "queue.direct2", result => { Console.WriteLine("Message From queue.direct2:" + result.Body); result.Commit(); }); #endregion #region 主題模式 services.AddRabbitConsumer(options => { options.Hosts = hosts; options.Password = password; options.Port = port; options.UserName = userName; options.VirtualHost = virtualHost; options.Arguments = arguments; options.Durable = true; options.AutoDelete = true; options.FetchCount = 2; options.AutoAck = false; options.RouteQueues = new RouteQueue[] { new RouteQueue() { Queue = "queue.topic1", Route = "topic1.#" }, new RouteQueue() { Queue = "queue.topic2", Route = "topic2.#" } }; options.Type = RabbitExchangeType.Topic; }).AddListener("exchange.topic", "queue.topic1", result => { Console.WriteLine("Message From queue.topic1:" + result.Body); result.Commit(); }).AddListener("exchange.topic", "queue.topic2", result => { Console.WriteLine("Message From queue.topic2:" + result.Body); result.Commit(); }); #endregionView Code
另外,AddListener中的訊息處理委託可以使用一個實現了IRabbitConsumerListener介面的類代替,如Demo中的RabbitConsumerListener:
public class RabbitConsumerListener : IRabbitConsumerListener { public Task ConsumeAsync(RecieveResult recieveResult) { Console.WriteLine("RabbitConsumerListener:" + recieveResult.Body); recieveResult.Commit(); return Task.CompletedTask; } }