1. 程式人生 > 其它 >C# .net 環境下使用rabbitmq訊息佇列

C# .net 環境下使用rabbitmq訊息佇列

訊息佇列的地位越來越重要,幾乎是面試的必問問題了,不會使用幾種訊息佇列都顯得尷尬,正好本文使用C#來帶你認識rabbitmq訊息佇列

  首先,我們要安裝rabbitmq,當然,如果有現成的,也可以使用,不知道曾幾何時,我喜歡將資料庫等等軟體安裝在linux虛擬機器,如果沒現成的rabbitmq,按照下面的來吧,嘿嘿

  rabbitmq安裝:https://www.cnblogs.com/shanfeng1000/p/11951703.html

  如果要實現rabbitmq叢集,參考:https://www.cnblogs.com/shanfeng1000/p/12097054.html

  我這裡使用的是rabbitmq叢集,但是沒有比較,只是已經安裝好了,就直接使用算了

  虛擬機器叢集地址:192.168.209.133,192.168.209.134,192.168.209.135

  埠使用的預設埠,都是5672,也就是AMQP協議埠

  Rabbitmq的工作模式

  先說說幾個概念

  生產者(producer):負責生產訊息,可以有多個生產者,可以理解為生成訊息的那部分邏輯

  消費者(consumer):從佇列中獲取訊息,對訊息處理的那部分邏輯

  佇列(queue):用於存放訊息,可以理解為先進先出的一個物件

  交換機(exchange):顧名思義,就是個中介的角色,將接收到的訊息按不同的規則轉發到其他交換機或者佇列中

  路由(route):就是交換機分發訊息的規則,交換機可以指定路由規則,生產者在釋出訊息時也可以指定訊息路由,比如交換機中設定A路由表示將訊息轉發到佇列1,B路由表示將訊息轉發到佇列2,那麼當交換機接收到訊息時,如果訊息的路由滿足A路由,則將訊息轉發到佇列1,如果滿足B路由則將訊息轉發到佇列2

  虛擬主機(virtual host):虛擬地址,用於進行邏輯隔離,一個虛擬主機裡面可以有若干個 exchange 和 queue,但是裡面不能有相同名稱的 exchange 或 queue

  再看看rabbitmq的幾種工作模式,具體可參考rabbitmq官網給出的Demo:https://www.rabbitmq.com/getstarted.html

    

  其中,第6中類似我們常用的請求-響應模式,但是使用的RPC請求響應,用的比較少,這裡就不過多解釋,感興趣的可以參考官網文件:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  總的來說,就是生產者將訊息釋出到rabbitmq上,然後消費者連線rabbitmq,獲取到訊息就消費,但是有幾點說明一下

  1、rabbitmq中的訊息是可被多次消費的,因為rabbitmq提供了ack機制,當消費者在消費訊息時,如果將自動ack設定成false,那麼需要手動提交ack才能告訴rabbitmq訊息已被使用,否則當通道關閉時,訊息會繼續呆在佇列中等待消費

  2、當存在多個消費者時,預設情況下,一個消費者獲取一個訊息,處理完成後再獲取下一個,但是rabbitmq消費一次性獲取多個,當然後當這些訊息消費完成後,再獲取下一批,這也就是rabbitmq的Qos機制

  

  C#使用rabbitmq

  如果感興趣的人多,到時候再單獨開一篇博文,現在就介紹其中的1-5種,也可以分類成兩種:不使用交換機和使用交換機,所以下面就分這兩種來說明

  首先,我們建立了兩個Demo專案:RabbitMQ.PublishConsole和RabbitMQ.ConsumeConsole,分別使用使用nuget安裝RabbitMQ.Client:

  

  其中RabbitMQ.PublishConsole是用來生產訊息,RabbitMQ.ConsumeConsole用來消費訊息  

  這裡我們安裝的是最新版本,舊版本和新版本在使用上可能會有一些區別


 不使用交換機情形

  不使用交換機有兩種模式:簡單模式和工作模式

  這裡先貼上生產者生成訊息的程式碼,簡單模式和工作模式這部分測試程式碼是一樣的:  

  RabbitMQ.PublishConsole

  上述程式碼執行完成後,佇列queue1中就有了10條訊息,可以在rabbitmq的後臺管理中看到:  

  

  程式碼中提到,通道在申明佇列時,如果佇列已經存在,則申明的引數一定要對上,否則會丟擲異常:The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'queue1' in vhost '/': received none but current is the value 'classic' of type 'longstr'', classId=50, methodId=10

  比如這裡,我實現在rabbitmq後臺建立了佇列,那麼他們的對應關係如下圖: 

   

  

  簡單模式

  這個模式很簡單,其實就是隻有一個消費者,簡單的保證操作的順序性

  

  接著貼上消費者程式碼:

  RabbitMQ.ConsumeConsole

  上述程式碼執行完成後,在後臺管理中可以看到訊息被消費掉了

  

  工作模式

  工作模式是簡單模式的拓展,如果業務簡單,對訊息的消費是一個耗時的過程,這個模式是一個好的選擇。

  

  接著呼叫生產者程式碼生產10條訊息,下面是消費者的測試程式碼  

  RabbitMQ.ConsumeConsole

  另外說明一下,程式碼中提到rabbitmq的QOS機制,這裡簡單解釋一下,當生產者將訊息釋出到rabbitmq之後,如果在未配置QOS的情況下,rabbitmq儘可能快速地傳送佇列中的所有訊息到消費者端,如果訊息比較多,消費者來不及處理,就會快取這些訊息,當訊息堆積過多,可能導致伺服器記憶體不足而影響其他程序,rabbitmq的QOS可以很好的解決這類問題,QOS就是限制消費者一次性從rabbitmq中獲取訊息的個數,而不是獲取所有訊息。比如設定rabbitmq的QOS為10,也就是prefetch=10,就是說,哪怕rabbitmq中有100條訊息,消費者也只是一次性獲取10條,然後消費者消費這10條訊息,剩下的交給其他消費者,當10條訊息中的unacked個數少於prefetch * 消費者數目時,會繼續從rabbitmq獲取訊息,如果在工作模式中,不使用QOS,你會發現,所有的訊息都被一個消費者消費了

  


  使用交換機情形

  使用交換機的情形有3種:釋出訂閱模式,路由模式,主題模式

  上面說了,交換機是一箇中介的角色,當一個交換機建立後,可以將其他佇列或者交換機與當前交換機繫結,繫結時需要指定繫結路由規則,這個和交換機型別有關。

  當我們不使用交換機時,那麼生產者是直接將訊息釋出到佇列中去的,生產者只需要指定訊息接收的佇列即可,而使用交換機做中轉時,生產者只需要將訊息釋出到交換機,然後交換機根據接收到的訊息,按與交換機繫結的路由規則,將訊息轉發到其他交換機或者佇列中,這個處理過程和交換機的型別有關,交換機一般分為4類:

  direct:直連型別,就是將訊息的路由和交換機的繫結路由作比較,當兩者一致時,則匹配成功,然後訊息就會被轉發到這個繫結路由後的佇列或者交換機

  fanout:這種型別的交換機是不需要指定路由的,當交換機接收到訊息時,會將訊息廣播到所有繫結到它的所有佇列或交換機中

  topic:主題型別,類似direct型別,只不過在將訊息的路由和繫結路由做比較時,是通過特定表示式去比較的,其中# 匹配一個或多個,* 匹配一個

  headers:頭部交換機,允許使用訊息頭中的資訊來做匹配規則,這個用的少,基本上不用,這裡也就不過多介紹了

  到這裡,你應該發覺,使用交換機的三種情形,無非就是使用交換機的型別不一樣,釋出訂閱模式--fanout,路由模式--direct,主題模式--topic

  現在我們先去rabbitmq的後臺中,建立這幾種交換機:

  交換機的建立及繫結都可以在程式碼中實現,如IModel類的QueueBind,ExchangeBind等方法,用多了就自然熟了,這裡為了方便截圖,就到後臺去建立了

  

  然後我們建立兩個佇列,並按指定型別分別繫結到這3個交換機中:

   佇列:

  

  demo.direct繫結佇列規則:

   

  demo.fanout繫結佇列規則:

  

  demo.topic繫結佇列規則:

  

  上面所描述的,無非就是三種模式中釋出訊息方式的不一樣,消費者當然還是從佇列獲取訊息消費的,這裡我們就先貼出消費者的程式碼:

  RabbitMQ.ConsumeConsole

  這裡我們使用了兩個佇列,每個佇列我們這裡只用了一個消費者,對於下面幾種模式,這個消費者程式碼都能消費到

  釋出訂閱模式

  釋出訂閱模式使用的是fanout型別的交換機,這個型別無需指定路由,交換機會將訊息廣播到每個繫結到交換機的佇列或者交換機  

  

  RabbitMQ.PublishConsole

  程式碼中,我們往交換機發布了10條訊息,交換機接收到訊息後,會將訊息轉發到queue1和queue2,因此,queue1和queue2都會收到10條訊息:

  

  路由模式

  路由模式使用的是direct型別的交換機,也即在進行路由匹配時,需要匹配的路由一直才算匹配成功,我們把釋出訂閱模式的程式碼稍作修改即可,貼出生產者部分程式碼:  

   

  RabbitMQ.PublishConsole

  程式碼中,我們往demo.direct交換機發布了10條訊息,其中5條訊息的路由是apple,另外5條訊息的路由是banana,demo.direct交換機繫結的兩個佇列中,queue1的繫結路由是apple,queue2的繫結路由是banana,那麼demo.direct交換機會將路由是apple的訊息轉發到queue1,將路由是banana的訊息轉發到queue2,從後臺可以看每個佇列中已經有5個訊息準備好了:

  

  接下來可以使用消費者將它們消費掉

  主題模式

  主題模式使用的topic型別的交換機,在進行匹配時,是根據表示式去匹配,# 匹配一個或多個,* 匹配一個,我們將路由模式的程式碼稍作修改:    

  

  RabbitMQ.PublishConsole

  程式碼中,我們往demo.topic交換機中釋出了10條訊息,其中5條訊息的路由是以apple開頭的,另外5條訊息的路由是以banana開頭的,demo.direct交換機繫結的兩個佇列中,queue1的繫結路由是apple.#,就是匹配以apple開頭的路由,queue2的繫結路由是banana.#,就是匹配以banana開頭的路由,那麼demo.direct交換機會將路由是以apple開頭的的訊息轉發到queue1,將路由是以banana開頭的的訊息轉發到queue2,從後臺可以看每個佇列中已經有5個訊息準備好了:

  

  


  封裝

  其實rabbitmq的使用還是比較簡單的,只需要多謝謝程式碼嘗試一下就能熟悉

  一般的,像這種第三方外掛的呼叫,我建議自己要做一層封裝,最好是根據自己的需求去封裝,然後專案中只需要呼叫自己封裝的類就行了,下面貼出我自己封裝的類:  

  QueueOptions   RabbitMQExchangeType   RabbitBase   RabbitMQProducer   
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    public class RabbitMQConsumer : RabbitBase
    {
        public RabbitMQConsumer(params string[] hosts) : base(hosts)
        {

        }
        public RabbitMQConsumer(params (string, int)[] hostAndPorts) : base(hostAndPorts)
        {

        }

        public event Action<RecieveResult> Received;

        /// <summary>
        /// 構造消費者
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="options"></param>
        /// <returns></returns>
        private IBasicConsumer ConsumeInternal(IModel channel, ConsumeQueueOptions options)
        {
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                try
                {
                    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                    if (!options.AutoAck)
                    {
                        cancellationTokenSource.Token.Register(() =>
                        {
                            channel.BasicAck(e.DeliveryTag, false);
                        }); 
                    }
                    Received?.Invoke(new RecieveResult(e, cancellationTokenSource));
                }
                catch { }
            };
            if (options.FetchCount != null)
            {
                channel.BasicQos(0, options.FetchCount.Value, false);
            }
            return consumer;
        }

        #region 普通模式、Work模式
        /// <summary>
        /// 消費訊息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string queue, ConsumeQueueOptions options = null)
        {
            options = options ?? new ConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消費訊息
        /// </summary>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string queue, Action<ConsumeQueueOptions> configure)
        {
            ConsumeQueueOptions options = new ConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(queue, options);
        }
        #endregion
        #region 訂閱模式、路由模式、Topic模式
        /// <summary>
        /// 消費訊息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="options"></param>
        public ListenResult Listen(string exchange, string queue, ExchangeConsumeQueueOptions options = null)
        {
            options = options ?? new ExchangeConsumeQueueOptions();
            var channel = GetChannel();
            channel.QueueDeclare(queue, options.Durable, false, options.AutoDelete, options.Arguments ?? new Dictionary<string, object>());
            if (options.RoutingKeys != null && !string.IsNullOrEmpty(exchange))
            {
                foreach (var key in options.RoutingKeys)
                {
                    channel.QueueBind(queue, exchange, key, options.BindArguments);
                }
            }
            var consumer = ConsumeInternal(channel, options);
            channel.BasicConsume(queue, options.AutoAck, consumer);
            ListenResult result = new ListenResult();
            result.Token.Register(() =>
            {
                try
                {
                    channel.Close();
                    channel.Dispose();
                }
                catch { }
            });
            return result;
        }
        /// <summary>
        /// 消費訊息
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="configure"></param>
        public ListenResult Listen(string exchange, string queue, Action<ExchangeConsumeQueueOptions> configure)
        {
            ExchangeConsumeQueueOptions options = new ExchangeConsumeQueueOptions();
            configure?.Invoke(options);
            return Listen(exchange, queue, options);
        }
        #endregion
    }
    public class RecieveResult
    {
        CancellationTokenSource cancellationTokenSource;
        public RecieveResult(BasicDeliverEventArgs arg, CancellationTokenSource cancellationTokenSource)
        {
            this.Body = Encoding.UTF8.GetString(arg.Body);
            this.ConsumerTag = arg.ConsumerTag;
            this.DeliveryTag = arg.DeliveryTag;
            this.Exchange = arg.Exchange;
            this.Redelivered = arg.Redelivered;
            this.RoutingKey = arg.RoutingKey;
            this.cancellationTokenSource = cancellationTokenSource;
        }

        /// <summary>
        /// 訊息體
        /// </summary>
        public string Body { get; private set; }
        /// <summary>
        /// 消費者標籤
        /// </summary>
        public string ConsumerTag { get; private set; }
        /// <summary>
        /// Ack標籤
        /// </summary>
        public ulong DeliveryTag { get; private set; }
        /// <summary>
        /// 交換機
        /// </summary>
        public string Exchange { get; private set; }
        /// <summary>
        /// 是否Ack
        /// </summary>
        public bool Redelivered { get; private set; }
        /// <summary>
        /// 路由
        /// </summary>
        public string RoutingKey { get; private set; }

        public void Commit()
        {
            if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;

            cancellationTokenSource.Cancel();
            cancellationTokenSource.Dispose();
            cancellationTokenSource = null;
        }
    }
    public class ListenResult
    {
        CancellationTokenSource cancellationTokenSource;

        /// <summary>
        /// CancellationToken
        /// </summary>
        public CancellationToken Token { get { return cancellationTokenSource.Token; } }
        /// <summary>
        /// 是否已停止
        /// </summary>
        public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }

        public ListenResult()
        {
            cancellationTokenSource = new CancellationTokenSource();
        }

        /// <summary>
        /// 停止監聽
        /// </summary>
        public void Stop()
        {
            cancellationTokenSource.Cancel();
        }
    }
}

  測試Demo  

  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            string queue = "queue1";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            //消費者
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"接收到資料:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                    });
                }
            }).Start();

            //訊息生產
            using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
            {
                producer.UserName = userName;
                producer.Password = password;
                producer.Port = port;
                producer.VirtualHost = virtualHost;

                string message = "";
                do
                {
                    message = Console.ReadLine();
                    if (string.IsNullOrEmpty(message))
                    {
                        break;
                    }
                    producer.Publish(queue, message, options => { options.Arguments = arguments; });

                } while (true);
            }
        }
    }
}
  
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQ.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] hosts = new string[] { "192.168.209.133", "192.168.209.134", "192.168.209.135" };
            int port = 5672;
            string userName = "admin";
            string password = "123456";
            string virtualHost = "/";
            string queue = "queue1";
            var arguments = new Dictionary<string, object>() { { "x-queue-type", "classic" } };

            //消費者1
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者1接收到資料:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                        options.FetchCount = 1;
                    });
                }
            }).Start();

            //消費者2
            new Thread(() =>
            {
                using (RabbitMQConsumer consumer = new RabbitMQConsumer(hosts))
                {
                    consumer.UserName = userName;
                    consumer.Password = password;
                    consumer.Port = port;
                    consumer.VirtualHost = virtualHost;

                    consumer.Received += result =>
                    {
                        Console.WriteLine($"消費者2接收到資料:{result.Body}");
                        result.Commit();//提交
                    };
                    consumer.Listen(queue, options =>
                    {
                        options.AutoAck = false;
                        options.Arguments = arguments;
                        options.FetchCount = 2;
                    });
                }
            }).Start();

            //訊息生產
            using (RabbitMQProducer producer = new RabbitMQProducer(hosts))
            {
                producer.UserName = userName;
                producer.Password = password;
                producer.Port = port;
                producer.VirtualHost = virtualHost;

                string message = "";
                do
                {
                    message = Console.ReadLine();
                    if (string.IsNullOrEmpty(message))
                    {
                        break;
                    }
                    producer.Publish(queue, message, options => { options.Arguments = arguments; });

                } while (true);
            }
        }
    }
}
  釋出訂閱模式   路由模式   主題模式

  上面是我自己做的封裝,因為RabbitMQ.Client功能齊全,但是使用比較麻煩,需要編寫的程式碼多一些,推薦一下第三方對rabbitmq的封裝外掛:EasyNetQ,它是建立在RabbitMQ.Client上的,多數時候可以直接通過EasyNetQ就可以完成訊息釋出與消費,感興趣的可以瞭解一下