1. 程式人生 > >C# 版RabbitMQ 從安裝到使用

C# 版RabbitMQ 從安裝到使用

1. 說明

在企業應用系統領域,會面對不同系統之間的通訊、整合與整合,尤其當面臨異構系統時,這種分散式的呼叫與通訊變得越發重要。其次,系統中一般會有很多對實時性要求不高的但是執行起來比較較耗時的地方,比如傳送簡訊,郵件提醒,更新文章閱讀計數,記錄使用者操作日誌等等,如果實時處理的話,在使用者訪問量比較大的情況下,對系統壓力比較大。

面對這些問題,我們一般會將這些請求,放在訊息佇列MQ中處理;異構系統之間使用訊息進行通訊。

MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過 佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求。

MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。

RabbitMQ是一個在AMQP基礎上完整的,可複用的企業訊息系統。他遵循Mozilla Public License開源協議。

訊息傳遞相較檔案傳遞與遠端過程呼叫(RPC)而言,似乎更勝一籌,因為它具有更好的平臺無關性,並能夠很好地支援併發與非同步呼叫。所以如果系統中出現瞭如下情況:

對操作的實時性要求不高,而需要執行的任務極為耗時;
存在異構系統間的整合;
  一般的可以考慮引入訊息佇列。對於第一種情況,常常會選擇訊息佇列來處理執行時間較長的任務。引入的訊息佇列就成了訊息處理的緩衝區。訊息佇列引入的非同步通訊機制,使得傳送方和接收方都不用等待對方返回成功訊息,就可以繼續執行下面的程式碼,從而提高了資料處理的能力。尤其是當訪問量和資料流量較大的情況下,就可以結合訊息佇列與後臺任務,通過避開高峰期對大資料進行處理,就可以有效降低資料庫處理資料的負荷。

本文簡單介紹在RabbitMQ這一訊息代理工具,以及在.NET中如何使用RabbitMQ.

2. 搭建環境

2.1 安裝Erlang語言執行環境

由於RabbitMQ使用Erlang語言編寫,所以先安裝Erlang語言執行環境。具體移步部落格:
  windows配置Erlang環境>

2.2 安裝RabbitMQ服務端

地址: http://www.rabbitmq.com/

下載安裝。

使RabbitMQ官文以Windows Service的方式在後臺執行:開啟cmd切換到sbin目錄下(安裝路勁)執行

rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start

rabbitmqctl stop :停止rabbitmq 
rabbitmq-server restart : 重啟rabbitmq
因為rabbitmqctl是沒有restart命令的,所以重啟rabbitmq服務需要這麼兩步。

現在RabbitMQ的服務端已經啟動起來了。

要檢視和控制RabbitMQ服務端的狀態,可以用rabbitmqctl這個指令碼。

比如檢視狀態:

rabbitmqctl status

在這裡插入圖片描述 
  假如顯示node沒有連線上,需要到C:\Windows目錄下,將.erlang.cookie檔案,拷貝到使用者目錄下 C:\Users{使用者名稱},這是Erlang的Cookie檔案,允許與Erlang進行互動。

使用命令檢視使用者:

rabbitmqctl list_users

在這裡插入圖片描述

RabbitMQ會為我們建立預設的使用者名稱guest和密碼guest,guest預設擁有RabbitMQ的所有許可權。

一般的,我們需要新建一個我們自己的使用者,設定密碼,並授予許可權,並將其設定為管理員,可以使用下面的命令來執行這一操作:

rabbitmqctl  add_user  JC JayChou   //建立使用者JC密碼為JayChou
rabbitmqctl  set_permissions  JC ".*"  ".*"  ".*"    //賦予JC讀寫所有訊息佇列的許可權
rabbitmqctl  set_user_tags JC administrator    //分配使用者組

修改JC密碼為123:

rabbitmqctl change_password JC  123

刪除使用者JC:

rabbitmqctl delete_user  JC

也可以開啟rabbitmq_management外掛,在web介面檢視和管理RabbitMQ服務

rabbitmq-plugins enable rabbitmq_management 

在這裡插入圖片描述

2.3下載RabbitMQ的Client端dll

下載地址:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/

本人下載了這個 rabbitmq-dotnet-client-3.6.6-dotnet-4.5.zip

在這裡插入圖片描述

解壓,我們需要的是這個檔案,以後會引用到vs的專案中:在這裡插入圖片描述

3.使用

3.1在使用RabitMQ之前,先對幾個概念做一下說明

RabbitMQ是一個訊息代理。他從訊息生產者(producers)那裡接收訊息,然後把訊息送給訊息消費者(consumer)在傳送和接受之間,他能夠根據設定的規則進行路由,快取和持久化。

一般提到RabbitMQ和訊息,都用到一些專有名詞。

  • 生產(Producing)意思就是傳送。傳送訊息的程式就是一個生產者(producer)。我們一般用"P"來表示:
    在這裡插入圖片描述
  • 佇列(queue)就是郵箱的名稱。訊息通過你的應用程式和RabbitMQ進行傳輸,它們只能儲存在佇列(queue)中。 佇列(queue)容量沒有限制,你要儲存多少訊息都可以——基本上是一個無限的緩衝區。多個生產者(producers)能夠把訊息傳送給同一個佇列,同樣,多個消費者(consumers)也能從同一個佇列(queue)中獲取資料。佇列可以畫成這樣(圖上是佇列的名稱):
    在這裡插入圖片描述
  • 消費(Consuming)和獲取訊息是一樣的意思。一個消費者(consumer)就是一個等待獲取訊息的程式。我們把它畫作"C":
  • 在這裡插入圖片描述

#### 通常,訊息生產者,訊息消費者和訊息代理不在同一臺機器上。

3.2 Hello Word

下面來展示簡單的RabbitMQ的使用:在這裡插入圖片描述

3.2.1 首先建立名為ProjectSend的控制檯專案,需要引用RabbitMQ.Client.dll。這個程式作為Producer生產者,用來發送資料:

static void Main(string[] args)
    {
        var factory = new ConnectionFactory();
        factory.HostName = "localhost";//RabbitMQ服務在本地執行
        factory.UserName = "guest";//使用者名稱
        factory.Password = "guest";//密碼

        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare("hello", false, false, false, null);//建立一個名稱為hello的訊息佇列
                string message = "Hello World"; //傳遞的訊息內容
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("", "hello", null, body); //開始傳遞
                Console.WriteLine("已傳送: {0}", message);
          Console.ReadLine();
            }
        }
    }

首先,需要建立一個ConnectionFactory,設定目標,由於是在本機,所以設定為localhost,如果RabbitMQ不在本機,只需要設定目標機器的IP地址或者機器名稱即可,然後設定前面建立的使用者名稱和密碼。

緊接著要建立一個Channel,如果要傳送訊息,需要建立一個佇列,然後將訊息釋出到這個佇列中。在建立佇列的時候,只有RabbitMQ上該佇列不存在,才會去建立。訊息是以二進位制陣列的形式傳輸的,所以如果訊息是實體物件的話,需要序列化和然後轉化為二進位制陣列。

現在客戶端傳送程式碼已經寫好了,執行之後,訊息會發布到RabbitMQ的訊息佇列中,現在需要編寫服務端的程式碼連線到RabbitMQ上去獲取這些訊息。

3.2.2建立名為ProjectReceive的控制檯專案,引用RabbitMQ.Client.dll。作為Consumer消費者,用來接收資料:

static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("hello", false, false, false, null);

                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume("hello", false, consumer);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body); 
                        Console.WriteLine("已接收: {0}", message);   
                              //清除取出來的資料
                        channel.BasicAck(ea.DeliveryTag, false);
                    };
                    Console.ReadLine(); 
                }
            }
        }

和傳送一樣,首先需要定義連線,然後宣告訊息佇列。要接收訊息,需要定義一個Consume,然後在接收訊息的事件中處理資料。
注:channel.BasicAck:參考檔案:https://blog.csdn.net/a491857321/article/details/50670238

3.2.3 現在傳送和接收的客戶端都寫好了,讓我們編譯執行起來

傳送訊息:

在這裡插入圖片描述

現在,名為hello的訊息佇列中,傳送了一條訊息。這條訊息儲存到了RabbitMQ的伺服器上了。使用rabbitmqctl 的list_queues可以檢視所有的訊息佇列,以及裡面的訊息個數,可以看到,目前Rabbitmq上只有一個訊息佇列,裡面只有一條訊息:在這裡插入圖片描述
也可以在web管理介面檢視此queue的相關資訊:
在這裡插入圖片描述

接收訊息:

在這裡插入圖片描述
既然訊息已經被接收了,那我們再來看queue的內容:
在這裡插入圖片描述

可見,訊息中的內容在接收之後已被刪除了。

3.3 工作佇列

前面的例子展示瞭如何在指定的訊息佇列傳送和接收訊息。

現在我們建立一個工作佇列(work queue)來將一些耗時的任務分發給多個工作者(workers):
在這裡插入圖片描述

工作佇列(work queues, 又稱任務佇列Task Queues)的主要思想是為了避免立即執行並等待一些佔用大量資源、時間的操作完成。而是把任務(Task)當作訊息傳送到佇列中,稍後處理。一個執行在後臺的工作者(worker)程序就會取出任務然後處理。當執行多個工作者(workers)時,任務會在它們之間共享。

這個在網路應用中非常有用,它可以在短暫的HTTP請求中處理一些複雜的任務。在一些實時性要求不太高的地方,我們可以處理完主要操作之後,以訊息的方式來處理其他的不緊要的操作,比如寫日誌等等。

準備

在第一部分,傳送了一個包含“Hello World!”的字串訊息。現在傳送一些字串,把這些字串當作複雜的任務。這裡使用time.sleep()函式來模擬耗時的任務。在字串中加上點號(.)來表示任務的複雜程度,一個點(.)將會耗時1秒鐘。比如"Hello…"就會耗時3秒鐘。

對之前示例的send.cs做些簡單的調整,以便可以傳送隨意的訊息。這個程式會按照計劃傳送任務到我們的工作佇列中。

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2;

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "hello", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

接著我們修改接收端,讓他根據訊息中的逗點的個數來Sleep對應的秒數:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare("hello", false, false, false, null);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("hello", true, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);
                        
                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");
            }
        }
    }
}

輪詢分發

使用工作佇列的一個好處就是它能夠並行的處理佇列。如果堆積了很多工,我們只需要新增更多的工作者(workers)就可以了,擴充套件很簡單。

現在,我們先啟動兩個接收端,等待接受訊息,然後啟動一個傳送端開始傳送訊息。

在這裡插入圖片描述

在cmd條件下,傳送了5條訊息,每條訊息後面的逗點表示該訊息需要執行的時長,來模擬耗時的操作。

然後可以看到,兩個接收端依次接收到了發出的訊息:

在這裡插入圖片描述

預設,RabbitMQ會將每個訊息按照順序依次分發給下一個消費者。所以每個消費者接收到的訊息個數大致是平均的。 這種訊息分發的方式稱之為輪詢(round-robin)。

3.4 訊息響應

當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否執行到一半就掛掉。在當前的程式碼中,當RabbitMQ將訊息傳送給消費者(consumers)之後,馬上就會將該訊息從佇列中移除。此時,如果把處理這個訊息的工作者(worker)停掉,正在處理的這條訊息就會丟失。同時,所有傳送到這個工作者的還沒有處理的訊息都會丟失。

我們不想丟失任何任務訊息。如果一個工作者(worker)掛掉了,我們希望該訊息會重新發送給其他的工作者(worker)。

為了防止訊息丟失,RabbitMQ提供了訊息響應(acknowledgments)機制。消費者會通過一個ack(響應),告訴RabbitMQ已經收到並處理了某條訊息,然後RabbitMQ才會釋放並刪除這條訊息。

如果消費者(consumer)掛掉了,沒有傳送響應,RabbitMQ就會認為訊息沒有被完全處理,然後重新發送給其他消費者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會丟失訊息。

訊息是沒有超時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送訊息。這樣在處理一個耗時非常長的訊息任務的時候就不會出問題了。

訊息響應預設是開啟的。在之前的例子中使用了no_ack=True標識把它關閉。是時候移除這個標識了,當工作者(worker)完成了任務,就傳送一個響應。

channel.BasicConsume("hello", false, consumer);

while (true)
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine("Received {0}", message);
    Console.WriteLine("Done");

    channel.BasicAck(ea.DeliveryTag, false);
}

現在,可以保證,即使正在處理訊息的工作者被停掉,這些訊息也不會丟失,所有沒有被應答的訊息會被重新發送給其他工作者.

一個很常見的錯誤就是忘掉了BasicAck這個方法,這個錯誤很常見,但是後果很嚴重. 當客戶端退出時,待處理的訊息就會被重新分發,但是RabitMQ會消耗越來越多的記憶體,因為這些沒有被應答的訊息不能夠被釋放。除錯這種case,可以使用rabbitmqct列印messages_unacknoledged欄位。

rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.
3.5 訊息持久化

前面已經搞定了即使消費者down掉,任務也不會丟失,但是,如果RabbitMQ Server停掉了,那麼這些訊息還是會丟失。

當RabbitMQ Server 關閉或者崩潰,那麼裡面儲存的佇列和訊息預設是不會儲存下來的。如果要讓RabbitMQ儲存住訊息,需要在兩個地方同時設定:需要保證佇列和訊息都是持久化的。

首先,要保證RabbitMQ不會丟失佇列,所以要做如下設定:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

雖然在語法上是正確的,但是在目前階段是不正確的,因為我們之前已經定義了一個非持久化的hello佇列。RabbitMQ不允許我們使用不同的引數重新定義一個已經存在的同名佇列,如果這樣做就會報錯。現在,定義另外一個不同名稱的佇列:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare 這個改動需要在傳送端和接收端同時設定。

現在保證了task_queue這個訊息佇列即使在RabbitMQ Server重啟之後,佇列也不會丟失。 然後需要保證訊息也是持久化的, 這可以通過設定IBasicProperties.SetPersistent 為true來實現:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

需要注意的是,將訊息設定為持久化並不能完全保證訊息不丟失。雖然他告訴RabbitMQ將訊息儲存到磁碟上,但是在RabbitMQ接收到訊息和將其儲存到磁碟上這之間仍然有一個小的時間視窗。 RabbitMQ 可能只是將訊息儲存到了快取中,並沒有將其寫入到磁碟上。持久化是不能夠一定保證的,但是對於一個簡單任務佇列來說已經足夠。如果需要訊息佇列持久化的強保證,可以使用publisher confirms

3.6 公平分發

你可能會注意到,訊息的分發可能並沒有如我們想要的那樣公平分配。比如,對於兩個工作者。當奇數個訊息的任務比較重,但是偶數個訊息任務比較輕時,奇數個工作者始終處理忙碌狀態,而偶數個工作者始終處理空閒狀態。但是RabbitMQ並不知道這些,他仍然會平均依次的分發訊息。

為了改變這一狀態,我們可以使用basicQos方法,設定perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工作者傳送多於1個的訊息,或者換句話說。在一個工作者還在處理訊息,並且沒有響應訊息之前,不要給他分發新的訊息。相反,將這條新的訊息傳送給下一個不那麼忙碌的工作者。

channel.BasicQos(0, 1, false); 
3.7 完整例項

現在將所有這些放在一起:

傳送端程式碼如下:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
                   
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
                    
            string message = GetMessage(args);
            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);
                  

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish("", "task_queue", properties, body);
            Console.WriteLine(" set {0}", message);
        }
    }

    Console.ReadKey();
}

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

接收端程式碼如下:

static void Main(string[] args)
{
    var factory = new ConnectionFactory();
    factory.HostName = "localhost";
    factory.UserName = "yy";
    factory.Password = "hello!";

    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            bool durable = true;
            channel.QueueDeclare("task_queue", durable, false, false, null);
            channel.BasicQos(0, 1, false);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume("task_queue", false, consumer);

            while (true)
            {
                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                int dots = message.Split('.').Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine("Received {0}", message);
                Console.WriteLine("Done");

                channel.BasicAck(ea.DeliveryTag, false);
            }
        }
    }
}

4 管理介面
RabbitMQ管理介面,通過該介面可以檢視RabbitMQ Server 當前的狀態,該介面是以外掛形式提供的,並且在安裝RabbitMQ的時候已經自帶了該外掛。需要做的是在RabbitMQ控制檯介面中啟用該外掛,命令如下:

rabbitmq-plugins enable rabbitmq_management在這裡插入圖片描述

現在,在瀏覽器中輸入 http://server-name:15672/ server-name換成機器地址或者域名,如果是本地的,直接用localhost(RabbitMQ 3.0之前版本埠號為55672)在輸入之後,彈出登入介面,使用我們之前建立的使用者登入。在這裡插入圖片描述

在該介面上可以看到當前RabbitMQServer的所有狀態。

5 總結

本文簡單介紹了訊息佇列的相關概念,並介紹了RabbitMQ訊息代理的基本原理以及在Windows 上如何安裝RabbitMQ和在.NET中如何使用RabbitMQ。訊息佇列在構建分散式系統和提高系統的可擴充套件性和響應性方面有著很重要的作用,希望本文對您瞭解訊息佇列以及如何使用RabbitMQ有所幫助。

https://www.cnblogs.com/knightlilz/p/5309200.html
http://www.rabbitmq.com/consumer-prefetch.html
https://www.jianshu.com/p/e5da341e351e