C# .net 環境下使用rabbitmq訊息佇列
訊息佇列的地位越來越重要,幾乎是面試的必問問題了,不會使用幾種訊息佇列都顯得尷尬,正好本文使用C#來帶你認識rabbitmq訊息佇列
首先,我們要安裝rabbitmq,當然,如果有現成的,也可以使用,不知道曾幾何時,我喜歡將資料庫等等軟體安裝在linux虛擬機器,如果沒現成的rabbitmq,按照下面的來吧,嘿嘿
rabbitmq安裝:https://www.cnblogs.com/shanfeng1000/p/11951703.html
如果要實現rabbitmq叢集,參考:https://www.cnblogs.com/shanfeng1000/p/12097054.html
我這裡使用的是rabbitmq叢集,但是沒有比較,只是已經安裝好了,就直接使用算了
虛擬機器叢集地址:192.168.209.133,192.168.209.134,192.168.209.135
埠使用的預設埠,都是5672,也就是AMQP協議埠
Rabbitmq的工作模式
先說說幾個概念
生產者(producer):負責生產訊息,可以有多個生產者,可以理解為生成訊息的那部分邏輯
消費者(consumer):從佇列中獲取訊息,對訊息處理的那部分邏輯
佇列(queue):用於存放訊息,可以理解為先進先出的一個物件
交換機(exchange):顧名思義,就是個中介的角色,將接收到的訊息按不同的規則轉發到其他交換機或者佇列中
路由(route):就是交換機分發訊息的規則,交換機可以指定路由規則,生產者在釋出訊息時也可以指定訊息路由,比如交換機中設定A路由表示將訊息轉發到佇列1,B路由表示將訊息轉發到佇列2,那麼當交換機接收到訊息時,如果訊息的路由滿足A路由,則將訊息轉發到佇列1,如果滿足B路由則將訊息轉發到佇列2
虛擬主機(virtual host):虛擬地址,用於進行邏輯隔離,一個虛擬主機裡面可以有若干個 exchange 和 queue,但是裡面不能有相同名稱的 exchange 或 queue
再看看rabbitmq的幾種工作模式,具體可參考rabbitmq官網給出的Demo:https://www.rabbitmq.com/getstarted.html
其中,第6中類似我們常用的請求-響應模式,但是使用的RPC請求響應,用的比較少,這裡就不過多解釋,感興趣的可以參考官網文件:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
總的來說,就是生產者將訊息釋出到rabbitmq上,然後消費者連線rabbitmq,獲取到訊息就消費,但是有幾點說明一下
1、rabbitmq中的訊息是可被多次消費的,因為rabbitmq提供了ack機制,當消費者在消費訊息時,如果將自動ack設定成false,那麼需要手動提交ack才能告訴rabbitmq訊息已被使用,否則當通道關閉時,訊息會繼續呆在佇列中等待消費
2、當存在多個消費者時,預設情況下,一個消費者獲取一個訊息,處理完成後再獲取下一個,但是rabbitmq消費一次性獲取多個,當然後當這些訊息消費完成後,再獲取下一批,這也就是rabbitmq的Qos機制
C#使用rabbitmq
如果感興趣的人多,到時候再單獨開一篇博文,現在就介紹其中的1-5種,也可以分類成兩種:不使用交換機和使用交換機,所以下面就分這兩種來說明
首先,我們建立了兩個Demo專案:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分別使用使用nuget安裝RabbitMQ.Client:
其中RabbitMQ.PublishConsole是用來生產訊息,RabbitMQ.ConsumeConsole用來消費訊息
這裡我們安裝的是最新版本,舊版本和新版本在使用上可能會有一些區別
不使用交換機情形
不使用交換機有兩種模式:簡單模式和工作模式
這裡先貼上生產者生成訊息的程式碼,簡單模式和工作模式這部分測試程式碼是一樣的:
RabbitMQ.PublishConsole上述程式碼執行完成後,佇列queue1中就有了10條訊息,可以在rabbitmq的後臺管理中看到:
程式碼中提到,通道在申明佇列時,如果佇列已經存在,則申明的引數一定要對上,否則會丟擲異常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'queue1' in vhost '/': received none but current is the value 'classic' of type 'longstr'', classId=50, methodId=10
比如這裡,我實現在rabbitmq後臺建立了佇列,那麼他們的對應關係如下圖:
簡單模式
這個模式很簡單,其實就是隻有一個消費者,簡單的保證操作的順序性
接著貼上消費者程式碼:
RabbitMQ.ConsumeConsole上述程式碼執行完成後,在後臺管理中可以看到訊息被消費掉了
工作模式
工作模式是簡單模式的拓展,如果業務簡單,對訊息的消費是一個耗時的過程,這個模式是一個好的選擇。
接著呼叫生產者程式碼生產10條訊息,下面是消費者的測試程式碼
RabbitMQ.ConsumeConsole另外說明一下,程式碼中提到rabbitmq的QOS機制,這裡簡單解釋一下,當生產者將訊息釋出到rabbitmq之後,如果在未配置QOS的情況下,rabbitmq儘可能快速地傳送佇列中的所有訊息到消費者端,如果訊息比較多,消費者來不及處理,就會快取這些訊息,當訊息堆積過多,可能導致伺服器記憶體不足而影響其他程序,rabbitmq的QOS可以很好的解決這類問題,QOS就是限制消費者一次性從rabbitmq中獲取訊息的個數,而不是獲取所有訊息。比如設定rabbitmq的QOS為10,也就是prefetch=10,就是說,哪怕rabbitmq中有100條訊息,消費者也只是一次性獲取10條,然後消費者消費這10條訊息,剩下的交給其他消費者,當10條訊息中的unacked個數少於prefetch * 消費者數目時,會繼續從rabbitmq獲取訊息,如果在工作模式中,不使用QOS,你會發現,所有的訊息都被一個消費者消費了
使用交換機情形
使用交換機的情形有3種:釋出訂閱模式,路由模式,主題模式
上面說了,交換機是一箇中介的角色,當一個交換機建立後,可以將其他佇列或者交換機與當前交換機繫結,繫結時需要指定繫結路由規則,這個和交換機型別有關。
當我們不使用交換機時,那麼生產者是直接將訊息釋出到佇列中去的,生產者只需要指定訊息接收的佇列即可,而使用交換機做中轉時,生產者只需要將訊息釋出到交換機,然後交換機根據接收到的訊息,按與交換機繫結的路由規則,將訊息轉發到其他交換機或者佇列中,這個處理過程和交換機的型別有關,交換機一般分為4類:
direct:直連型別,就是將訊息的路由和交換機的繫結路由作比較,當兩者一致時,則匹配成功,然後訊息就會被轉發到這個繫結路由後的佇列或者交換機
fanout:這種型別的交換機是不需要指定路由的,當交換機接收到訊息時,會將訊息廣播到所有繫結到它的所有佇列或交換機中
topic:主題型別,類似direct型別,只不過在將訊息的路由和繫結路由做比較時,是通過特定表示式去比較的,其中# 匹配一個或多個,* 匹配一個
headers:頭部交換機,允許使用訊息頭中的資訊來做匹配規則,這個用的少,基本上不用,這裡也就不過多介紹了
到這裡,你應該發覺,使用交換機的三種情形,無非就是使用交換機的型別不一樣,釋出訂閱模式--fanout,路由模式--direct,主題模式--topic
現在我們先去rabbitmq的後臺中,建立這幾種交換機:
交換機的建立及繫結都可以在程式碼中實現,如IModel類的QueueBind,ExchangeBind等方法,用多了就自然熟了,這裡為了方便截圖,就到後臺去建立了
然後我們建立兩個佇列,並按指定型別分別繫結到這3個交換機中:
佇列:
demo.direct繫結佇列規則:
demo.fanout繫結佇列規則:
demo.topic繫結佇列規則:
上面所描述的,無非就是三種模式中釋出訊息方式的不一樣,消費者當然還是從佇列獲取訊息消費的,這裡我們就先貼出消費者的程式碼:
RabbitMQ.ConsumeConsole這裡我們使用了兩個佇列,每個佇列我們這裡只用了一個消費者,對於下面幾種模式,這個消費者程式碼都能消費到
釋出訂閱模式
釋出訂閱模式使用的是fanout型別的交換機,這個型別無需指定路由,交換機會將訊息廣播到每個繫結到交換機的佇列或者交換機
RabbitMQ.PublishConsole
程式碼中,我們往交換機發布了10條訊息,交換機接收到訊息後,會將訊息轉發到queue1和queue2,因此,queue1和queue2都會收到10條訊息:
路由模式
路由模式使用的是direct型別的交換機,也即在進行路由匹配時,需要匹配的路由一直才算匹配成功,我們把釋出訂閱模式的程式碼稍作修改即可,貼出生產者部分程式碼:
RabbitMQ.PublishConsole
程式碼中,我們往demo.direct交換機發布了10條訊息,其中5條訊息的路由是apple,另外5條訊息的路由是banana,demo.direct交換機繫結的兩個佇列中,queue1的繫結路由是apple,queue2的繫結路由是banana,那麼demo.direct交換機會將路由是apple的訊息轉發到queue1,將路由是banana的訊息轉發到queue2,從後臺可以看每個佇列中已經有5個訊息準備好了:
接下來可以使用消費者將它們消費掉
主題模式
主題模式使用的topic型別的交換機,在進行匹配時,是根據表示式去匹配,# 匹配一個或多個,* 匹配一個,我們將路由模式的程式碼稍作修改:
RabbitMQ.PublishConsole
程式碼中,我們往demo.topic交換機中釋出了10條訊息,其中5條訊息的路由是以apple開頭的,另外5條訊息的路由是以banana開頭的,demo.direct交換機繫結的兩個佇列中,queue1的繫結路由是apple.#,就是匹配以apple開頭的路由,queue2的繫結路由是banana.#,就是匹配以banana開頭的路由,那麼demo.direct交換機會將路由是以apple開頭的的訊息轉發到queue1,將路由是以banana開頭的的訊息轉發到queue2,從後臺可以看每個佇列中已經有5個訊息準備好了:
封裝
其實rabbitmq的使用還是比較簡單的,只需要多謝謝程式碼嘗試一下就能熟悉
一般的,像這種第三方外掛的呼叫,我建議自己要做一層封裝,最好是根據自己的需求去封裝,然後專案中只需要呼叫自己封裝的類就行了,下面貼出我自己封裝的類:
QueueOptions RabbitMQExchangeType RabbitBase RabbitMQProducerusing RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { public class RabbitMQConsumer : RabbitBase { public RabbitMQConsumer(params string[] hosts) : base(hosts) { } public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts) { } public event Action<RecieveResult> Received; /// <summary> /// 構造消費者 /// </summary> /// <param name="channel"></param> /// <param name="options"></param> /// <returns></returns> private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options) { EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { try { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); if (!options.AutoAck) { cancellationTokenSource.Token.Register(() => { channel.BasicAck(e.DeliveryTag, false); }); } Received?.Invoke(new RecieveResult(e, cancellationTokenSource)); } catch { } }; if (options.FetchCount != null) { channel.BasicQos(0, options.FetchCount.Value, false); } return consumer; } #region 普通模式、Work模式 /// <summary> /// 消費訊息 /// </summary> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string queue, ConsumeQueueOptions options = null) { options = options ?? new ConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消費訊息 /// </summary> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure) { ConsumeQueueOptions options = new ConsumeQueueOptions(); configure?.Invoke(options); return Listen(queue, options); } #endregion #region 訂閱模式、路由模式、Topic模式 /// <summary> /// 消費訊息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="options"></param> public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null) { options = options ?? new ExchangeConsumeQueueOptions(); var channel = GetChannel(); channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>()); if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange)) { foreach (var key in options.RoutingKeys) { channel.QueueBind(queue, exchange, key, options.BindArguments); } } var consumer = ConsumeInternal(channel, options); channel.BasicConsume(queue, options.AutoAck, consumer); ListenResult result = new ListenResult(); result.Token.Register(() => { try { channel.Close(); channel.Dispose(); } catch { } }); return result; } /// <summary> /// 消費訊息 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="configure"></param> public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure) { ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions(); configure?.Invoke(options); return Listen(exchange, queue, options); } #endregion } public class RecieveResult { CancellationTokenSource cancellationTokenSource; public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource) { this.Body = Encoding.UTF8.GetString(arg.Body); this.ConsumerTag = arg.ConsumerTag; this.DeliveryTag = arg.DeliveryTag; this.Exchange = arg.Exchange; this.Redelivered = arg.Redelivered; this.RoutingKey = arg.RoutingKey; this.cancellationTokenSource = cancellationTokenSource; } /// <summary> /// 訊息體 /// </summary> public string Body { get; private set; } /// <summary> /// 消費者標籤 /// </summary> public string ConsumerTag { get; private set; } /// <summary> /// Ack標籤 /// </summary> public ulong DeliveryTag { get; private set; } /// <summary> /// 交換機 /// </summary> public string Exchange { get; private set; } /// <summary> /// 是否Ack /// </summary> public bool Redelivered { get; private set; } /// <summary> /// 路由 /// </summary> public string RoutingKey { get; private set; } public void Commit() { if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return; cancellationTokenSource.Cancel(); cancellationTokenSource.Dispose(); cancellationTokenSource = null; } } public class ListenResult { CancellationTokenSource cancellationTokenSource; /// <summary> /// CancellationToken /// </summary> public CancellationToken Token { get { return cancellationTokenSource.Token; } } /// <summary> /// 是否已停止 /// </summary> public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } } public ListenResult() { cancellationTokenSource = new CancellationTokenSource(); } /// <summary> /// 停止監聽 /// </summary> public void Stop() { cancellationTokenSource.Cancel(); } } }
測試Demo
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; string queue = "queue1"; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; //消費者 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"接收到資料:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue, options => { options.AutoAck = false; options.Arguments = arguments; }); } }).Start(); //訊息生產 using (RabbitMQProducer producer = new RabbitMQProducer(hosts)) { producer.UserName = userName; producer.Password = password; producer.Port = port; producer.VirtualHost = virtualHost; string message = ""; do { message = Console.ReadLine(); if (string.IsNullOrEmpty(message)) { break; } producer.Publish(queue, message, options => { options.Arguments = arguments; }); } while (true); } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace RabbitMQ.ConsoleApp { class Program { static void Main(string[] args) { string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" }; int port = 5672; string userName = "admin"; string password = "123456"; string virtualHost = "/"; string queue = "queue1"; var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } }; //消費者1 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者1接收到資料:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue, options => { options.AutoAck = false; options.Arguments = arguments; options.FetchCount = 1; }); } }).Start(); //消費者2 new Thread(() => { using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts)) { consumer.UserName = userName; consumer.Password = password; consumer.Port = port; consumer.VirtualHost = virtualHost; consumer.Received += result => { Console.WriteLine($"消費者2接收到資料:{result.Body}"); result.Commit();//提交 }; consumer.Listen(queue, options => { options.AutoAck = false; options.Arguments = arguments; options.FetchCount = 2; }); } }).Start(); //訊息生產 using (RabbitMQProducer producer = new RabbitMQProducer(hosts)) { producer.UserName = userName; producer.Password = password; producer.Port = port; producer.VirtualHost = virtualHost; string message = ""; do { message = Console.ReadLine(); if (string.IsNullOrEmpty(message)) { break; } producer.Publish(queue, message, options => { options.Arguments = arguments; }); } while (true); } } } }釋出訂閱模式 路由模式 主題模式
上面是我自己做的封裝,因為RabbitMQ.Client功能齊全,但是使用比較麻煩,需要編寫的程式碼多一些,推薦一下第三方對rabbitmq的封裝外掛:EasyNetQ,它是建立在RabbitMQ.Client上的,多數時候可以直接通過EasyNetQ就可以完成訊息釋出與消費,感興趣的可以瞭解一下