1. 程式人生 > >快速掌握RabbitMQ(一)——RabbitMQ的基本概念、安裝和C#驅動

快速掌握RabbitMQ(一)——RabbitMQ的基本概念、安裝和C#驅動

1 RabbitMQ簡介

       RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現,官網地址:http://www.rabbitmq.com。RabbitMQ作為一個訊息代理,主要負責接收、儲存和轉發訊息,它提供了可靠的訊息機制和靈活的訊息路由,並支援訊息叢集和分散式部署,常用於應用解耦,耗時任務佇列,流量削鋒等場景。本系列文章將系統介紹RabbitMQ的工作機制,程式碼驅動和叢集配置,本篇主要介紹RabbitMQ中一些基本概念,常用的RabbitMQ Control命令,最後寫一個C#驅動的簡單栗子。先看一下RabbitMQ的基本結構:

  上圖是RabbitMQ的一個基本結構,生產者Producer和消費者Consumer都是RabbitMQ的客戶端,Producer負責傳送訊息,Consumer負責消費訊息。

接下來我們結合這張圖來理解RabbitMQ的一些概念:

  Broker(Server):接受客戶端連線,實現AMQP訊息佇列和路由功能的程序,我們可以把Broker叫做RabbitMQ伺服器。

  Virtual Host:一個虛擬概念,一個Virtual Host裡面可以有若干個Exchange和Queue,主要用於許可權控制,隔離應用。如應用程式A使用VhostA,應用程式B使用VhostB,那麼我們在VhostA中只存放應用程式A的exchange,queue和訊息,應用程式A的使用者只能訪問VhostA,不能訪問VhostB中的資料。

  Exchange:接受生產者傳送的訊息,並根據Binding規則將訊息路由給伺服器中的佇列。ExchangeType決定了Exchange路由訊息的行為,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四種,不同型別的Exchange路由規則是不一樣的(這些以後會詳細介紹)。

  Queue:訊息佇列,用於儲存還未被消費者消費的訊息,佇列是先進先出的,預設情況下先儲存的訊息先被處理。

  Message:就是訊息,由Header和Body組成,Header是由生產者新增的各種屬性的集合,包括Message是否被持久化、由哪個Message Queue接受、優先順序是多少等,Body是真正傳輸的資料,內容格式為byte[]。

  Connection:連線,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP連線。

  Channel:通道,僅僅建立了客戶端到Broker之間的連線Connection後,客戶端還是不能傳送訊息的。需要在Connection的基礎上建立Channel,AMQP協議規定只有通過Channel才能執行AMQP的命令,一個Connection可以包含多個Channel。之所以需要Channel,是因為TCP連線的建立和釋放都是十分昂貴的。

2 RabbitMQ安裝

  因為RabbitMQ是用erlang語言開發的,所以我們在安裝RabbitMQ前必須要安裝erlang支援。

1 Windows平臺安裝

1 安裝erlang

  首先下載erlang,直接下載最新版本,當前下載的是 OTP 21.3 Windows 64-bit Binary File ,下載完成後一直下一步安裝即可。

2 安裝RabbitMQ

  下載Windows平臺的RabbtMQ,當前下載的是 rabbitmq-server-3.7.14.exe ,下載完成後一直下一步安裝即可。

3 安裝Web管理外掛

  開啟RabbitMQ Command Prompt,執行命令 rabbitmq-plugins enable rabbitmq_management 即可完成Web監控外掛的安裝。 

  安裝完成後,開啟瀏覽器輸入 http://127.0.0.1:15672/ 使用預設賬號[ name:guest / password:guest ]登入後介面如下,使用這個UI外掛我們可以輕鬆的檢視RabbitMQ中的交換機(exchange),佇列(queue)等內容,也可以對exchange,queue,使用者等進行新增、修改、刪除操作。

  到這一步Windows平臺安裝RabbitMQ完成了。 開啟服務管理器,RabbitMQ已經在正常運行了,如下:

2 Centos安裝RabbitMQ

1 安裝RabbitMQ

  這裡虛擬機器系統為Centos7,採用的安裝方式是yum安裝,為了簡單,這裡直接使用官方提供的erlang和RabbitMQ-server的自動安裝指令碼(官方安裝文件),逐行執行下邊的程式碼就可以安裝完成erlang和RabbitMQ。

#安裝socat
yum install socat

#安裝erlang
  curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
  yum -y install erlang

#安裝rabbitmq-server
  curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
  yum -y install rabbitmq-server

#啟動rabbitmq服務
  systemctl start rabbitmq-server
#新增web管理外掛
  rabbitmq-plugins enable rabbitmq_management

補充:如果安裝完成後,執行RabbitMQ執行命令特別慢,或者出現報錯【rabbitmq unable to perform an operation on node  xxx@xxx】,解決方法:

  編輯hosts,執行命令 vim /etc/hosts ,新增本機IP(或者虛擬機器IP)

  命令執行結束後,使用瀏覽器訪問 http://127.0.0.1:15672/ 也會出現web管理介面。通過上邊的安裝步驟安裝的RabbitMQ會生成Unit檔案,所以我們可以使用Systemd管理RabbitMQ服務,以下是幾條常用的命令:

#啟動RabbitMQ服務
  systemctl start rabbitmq-server
#停止RabbitMQ服務
  systemctl stop rabbitmq-server
#檢視RabbitMQ執行狀態
  systemctl status rabbitmq-server
#重啟RabbitMQ服務
  systemctl restart rabbitmq-server

2 RabbitMQ Control工具

  使用Web管理介面可以實現RabbitMQ的大部分常用功能,但是有些功能WebUI是做不到的,如:開啟/關閉RabbitMQ應用程式和叢集的管理等。RabbitMQ Control是RabbitMQ的命令列管理工具,可以呼叫所有的RabbitMQ內建功能,主命令是rabbitmqctl ,下邊是一個查詢使用者列表的命令,注意需要切換到sbin目錄下執行:

  為了方便的使用RabbitMQ Control工具,我們最好新增一個環境變數,Windows預設安裝時在PATH中新增一條: C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin ,不是預設安裝的話找到對應的安裝目錄新增PATH。按照上的安裝方法,Centos可以直接使用RabbitMQ Control工具,不需要多餘的配置。如果想詳細瞭解RabbitMQ Control工具,可以參考RabbitMQ Control的官方文件。

  這裡總結了一些最常用到的RabbitMQ Controll命令,有興趣的小夥伴可以試著執行一下這些命令,如在命令列工具中使用命令 rabbitmqctl add_user <username> <password>  新增一個新使用者。

1 基本控制命令

  基本控制命令主要用於啟動、停止應用程式、runtime等

#停止rabbitmq和runtime
  rabbitmqctl shutdown
#停止erlang節點
  rabbitmqctl stop 
#啟用rabbitmq 
  rabbitmqctl start_app 
#停止rabbitmq 
  rabbitmqctl stop_app
#檢視狀態
  rabbitmqctl status 
#檢視環境
  rabbitmqctl environment
#rabbitmq恢復最初狀態,內部的exchange和queue都清除
  rabbitmqctl reset 

2 服務狀態管理

  這些命令主要用於用於檢視exchang、channel、binding、queue、consumers:

#返回queue的資訊
  list_queues [-p <vhostpath>] [<queueinfoitem> ...] 
#返回exchange的資訊
  list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] 
#返回繫結資訊
  list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
 #返回連結資訊
  list_connections [<connectioninfoitem> ...] 
#返回目前所有的channels
  list_channels [<channelinfoitem> ...]   
#返回consumers
  list_consumers [-p <vhostpath>] 

3 使用者管理命令

  這些命令主要用於新增、修改、刪除使用者及管理使用者許可權

#在rabbitmq的內部資料庫新增使用者
  add_user <username> <password>  
#刪除一個使用者
  delete_user <username> 
#改變使用者密碼 
  change_password <username> <newpassword> 
#清除使用者密碼,禁止使用者登入
  clear_password <username>
#設定使用者tags,就是設定使用者角色
  set_user_tags <username> <tag> 
# 檢視使用者列表
  list_users 
#建立一個vhost
  add_vhost <vhostpath> 
#刪除一個vhosts
  delete_vhost <vhostpath>    
#列出vhosts
  list_vhosts [<vhostinfoitem> ...] 
#針對一個vhosts 給使用者賦予相關許可權
  set_permissions [-p <vhostpath>] <user> <conf> <write> <read> 
#清除一個使用者對vhost的許可權
  clear_permissions [-p <vhostpath>] <username> 
#列出所有使用者對某一vhost的許可權
  list_permissions [-p <vhostpath>]  
#列出某使用者的訪問許可權
  list_user_permissions <username> 

4 叢集管理命令

#clusternode表示node名稱,--ram表示node以ram node加入叢集中。預設node以disc node加入叢集,在一個node加入cluster之前,必須先停止該node的rabbitmq應用,即先執行stop_app。
  join_cluster <clusternode> [--ram]   
#顯示cluster中的所有node
  cluster_status 
#設定叢集名字
  set_cluster_name <clustername>
#修改叢集名字
  rename_cluster_node <oldname> <newname>
#改變一個cluster中node的模式,該節點在轉換前必須先停止,不能把一個叢集中唯一的disk node轉化為ram node
  change_cluster_node_type <disc | ram>
#遠端刪除一個節點,刪除前必須該節點必須先停止
  rabbitmqctl forget_cluster_node rabbit@rabbit1
#同步映象佇列
  sync_queue <queuename>
#取消同步佇列
  cancel_sync_queue <queuename>  
#清空佇列中所有訊息
  purge_queue [-p vhost] <queuename>

  這裡列舉的很多命令是現階段用不到的,如叢集控制相關的命令,這些命令的用法會在以後的章節中逐漸理解。

3  C#驅動RabbitMQ

1 一個簡單的栗子

  作為開發者,我們最在意的還是怎麼在程式碼中使用RabbitMQ,可以通過官方RabbitMQ開發文件來學習RabbitMQ的使用,這裡以.NET為例演示一下RabbitMQ的最基本用法。建立兩個Console應用,一個作為傳送訊息的生產者(Producer),一個作為接受訊息的消費者(Consumer),生產者向佇列寫入訊息,消費者接受這條訊息,結構如下:

 

  兩個控制檯應用都要新增RabbitMQ.Client包,命令如下:

Install-Package RabbitMQ.Client

 生產者(Producer)程式碼:

    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在裝置ip,這裡就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//使用者名稱
                Password = "123321"//密碼
            };
            //第一步:建立連線connection
            using (var connection = factory.CreateConnection())
            {
                //第二步:建立通道channel
                using (var channel = connection.CreateModel())
                {
                    //第三步:宣告交換機exchang
                    channel.ExchangeDeclare(exchange: "myexchange",
                                            type: ExchangeType.Direct,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //第四步:宣告佇列queue
                    channel.QueueDeclare(queue: "myqueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    Console.WriteLine("生產者準備就緒....");
                    //第五步:繫結佇列到互動機
                    channel.QueueBind(queue:"myqueue", exchange:"myexchange", routingKey:"mykey");
                    string message = "";
                    //第六步:傳送訊息
                    //在控制檯輸入訊息,按enter鍵傳送訊息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        //基本釋出
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine($"訊息【{message}】已傳送到佇列");
                    }
                }
            }
            Console.ReadKey();
        }
    }

 消費者(Consumer)程式碼: 

    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在裝置ip,這裡就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//使用者名稱
                Password = "123321"//密碼
            };
            //第一步:建立連線connection
            using (var connection = factory.CreateConnection())
            {
                //第二步:建立通道channel
                using (var channel = connection.CreateModel())
                {
                    //第三步:宣告佇列queue
                    channel.QueueDeclare(queue: "myqueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    //第四步:定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"接受到訊息【{message}】");
                    };
                    Console.WriteLine("消費者準備就緒....");
                    //第五步:處理訊息
                    channel.BasicConsume(queue: "myqueue",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    } 

  依次執行Producer和Consumer兩個應用程式,執行結果如下:

  注意:上邊的程式碼在生產者和消費者的程式碼中都聲明瞭exchange和queue,這主要是為了讓這兩個程式可以按任意順序啟動,如:我們只在生產者程式碼中定義了exchange和queue,卻先啟動消費者,這會讓造成消費者找不到自己需要的exhange和queue(出現404錯誤)。實際開發中建立exchange/queue、繫結佇列以及設定routingKey這些工作,都可以通過WebUI管理介面或者使用Rabbitmq Control工具完成。

  QueueDeclare方法用於宣告佇列,ExchangeDeclare用於宣告交換機,我們在使用這兩個方法宣告時,可以設定佇列和交換機的屬性,如queue的名字,長度限制,exchange是否持久化、交換機型別等。

2 QueueDeclare方法詳解

  在上邊的栗子中我們使用了宣告佇列的方法  QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments) ,該方法通過引數設定佇列的特性。這裡介紹一下該方法 中幾個引數的作用,先看一個完整的宣告佇列的栗子:

             //宣告佇列newsQueue
                    channel.QueueDeclare(queue: "myqueue",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: new Dictionary<string, object>() {
                                             //佇列中訊息的過期時間是8s
                                             { "x-message-ttl",1000*8 }, 
                                             //佇列60s沒有被使用,則刪除該佇列
                                             {"x-expires",1000*60 },
                                             //佇列最多儲存100條訊息
                                             {"x-max-length",100 },
                                             //佇列中ready型別訊息總共不能超過1000位元組
                                             {"x-max-length-bytes",1000 },
                                             //當佇列訊息滿了時,丟棄傳來後續訊息
                                             {"x-overflow","reject-publish" },
                                             //丟棄的訊息傳送到deadExchange交換機
                                             {"x-dead-letter-exchange","deadExchange" },
                                             //丟棄的訊息傳送到deadExchange交換機時的RoutingKey
                                             {"x-dead-letter-routing-key","deadKey" },
                                             //佇列中最大的優先順序等級為10(在Publish訊息時對每條訊息設定優先順序)
                                             {"x-max-priority",10 },
                                             //設定佇列預設為lazy
                                             {"x-queue-mode","lazy" }
                                         });

QueueDeclare方法的引數如下:

  queue:佇列名字;

  durable:是否持久化。設定為true時,佇列資訊儲存在rabbitmq的內建資料庫中,伺服器重啟時佇列也會恢復(注意:重啟後佇列內部的訊息不會恢復,怎麼實現訊息持久化以後會詳細介紹);

  exclusive:是否排外。設定為true時只有首次宣告該佇列的Connection可以訪問,其他Connection不能訪問該佇列;且在Connection斷開時,佇列會被刪除(即使durable設定為true也會被刪除);

  autoDelete:是否自動刪除。設定為true時,表示在最後一條使用該佇列的連線(Connection)斷開時,將自動刪除這個佇列;

  arguments:設定佇列的一些其它屬性,為Dictionary<string,object>型別,下表總結了arguments中可以設定的常用屬性。

引數名                                              作用 示例
Message TTL 設定佇列中訊息的有效時間 { "x-message-ttl",1000*8 },設定佇列中的所有訊息的有效期為8s;
Auto expire 自動刪除佇列。一定的時間內佇列沒有被使用,則自動刪除佇列 {"x-expires",1000*60 },設定佇列的過期時長為60s,如果60s沒有佇列被訪問,則刪除佇列;
Max length 佇列能儲存訊息的最大條數 {"x-max-length",100 },設定佇列最多儲存100條訊息;
Max length bytes 佇列中ready型別訊息的總位元組數  {"x-max-length-bytes",1000 }, 設定佇列中ready型別訊息總共不能超過1000位元組;
Overflow behaviour  當佇列訊息滿了時,再接收訊息時的處理方法。有兩種處理方案:預設為"drop-head"模式,表示從佇列頭部丟棄訊息;"reject-publish"表示不接收後續的訊息 {"x-overflow","reject-publish" },設定當佇列訊息滿了時,丟棄傳來後續訊息;
Dead letter exchange  用於儲存被丟棄的訊息的交換機名。Overflow behaviour 的兩種處理方案中丟棄的訊息都會發送到這個交換機 {"x-dead-letter-exchange","beiyongExchange" },設定丟棄的訊息傳送到名字位beiyongExchange的交換機;
Dead letter routing key  被丟棄的訊息傳送到Dead letter exchange時的使用的routing Key {"x-dead-letter-routing-key","deadKey" },設定丟棄的訊息傳送到beiyongExchange交換機時的RoutingKey值是"deadKey";
Maximum priority  設定佇列中訊息優先順序的最大等級,在publish訊息時可以設定單條訊息的優先順序等級 {"x-max-priority",10 },設定中訊息優先順序的最大等級為10;
Lazy mode  設定佇列的模式,如果設定為Lazy表示佇列中訊息儘可能存放在磁碟中,以減少記憶體佔用;不設定時訊息都存放在佇列中,用以儘可能快的處理訊息 {"x-queue-mode","lazy"},3.6以後版本可用,設定佇列中訊息儘可能存放在磁碟中,以減少記憶體佔用。在訊息擁堵時和訊息持久化配置使用可以減少記憶體佔用。

3 ExchangeDeclare方法詳解

   宣告交換機的方法 void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments) 可以設定交換機的特性,這裡簡單介紹一下這個方法的幾個引數:

 channel.ExchangeDeclare(exchange: "myexchange",
                         type: ExchangeType.Direct,
                         durable: true,
                         autoDelete: false,
               arguments: new Dictionary<string, object> { 
                        {"alternate-exchange","BeiyongExchange" }//如果訊息不能路由到該交換機,就把訊息路由到備用交換機BeiyongExchange上
               });

  exchange:交換機名字。

  type:交換機型別。exchange有direct、fanout、topic、header四種類型,在下一篇會詳細介紹;

  durable:是否持久化。設定為true時,交換機資訊儲存在rabbitmq的內建資料庫中,伺服器重啟時交換機資訊也會恢復;

  autoDelete:是否自動刪除。設定為true時,表示在最後一條使用該交換機的連線(Connection)斷開時,自動刪除這個佇列;

  arguments:其他的一些引數,型別為Dictionary<string,object> 。

小結

  本節主要介紹了RabbitMQ的基本概念,在Windows和Centos上的安裝方法,及RabbitMQ Control工具的基本使用,最後演示了一個C#驅動RabbitMQ的栗子,並詳細介紹了宣告queue和exchange的方法。通過這一節我們大概瞭解了RabbitMQ的基本使用。以後的章節會逐漸介紹RabbitMQ的四種exchange、兩種Consumer的特點和使用場景,以及訊息確認、優先順序、持久化等,最後搭建一個高可用的RabbitMQ叢集,如果文中有錯誤的話,希望大家可以指出,我會及時修改,謝謝!

 

參考文章

【1】 https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

【2】https://www.cnblogs.com/operationhome/p/10483840.html