1. 程式人生 > >快速掌握RabbitMQ(五)——搭建高可用的RabbitMQ叢集

快速掌握RabbitMQ(五)——搭建高可用的RabbitMQ叢集

  RabbitMQ的叢集是依賴erlang叢集的,而erlang叢集是通過.erlang.cookie檔案進行通訊認證的,所以我們使用RabbitMQ叢集時只需要配置一下.erlang.cookie檔案即可。下邊簡單演示一下RabbitMQ高可用叢集的搭建,附帶一個簡單使用C#驅動RabbtiMQ叢集的小栗子。

1 搭建RabbitMQ高可用叢集

  首先準備三臺裝置,這裡採用的三臺Centos7的虛擬機器,測試一下各個虛擬機器能不能相互ping通,如果可以相互ping通的話,在每臺虛擬機器上分別安裝RabbitMQ,可以參考第一篇的安裝方法。

第1步 修改主機配置

   為了方便機器間的相互訪問,三臺centos都執行  vim /etc/hosts ,新增下邊的配置(注意修改成自己裝置的IP):

192.168.70.129 rabbitmq1
192.168.70.131 rabbitmq2
192.168.70.133 rabbitmq3

  一般情況,hosts檔案中內容如下:

第2步:修改.erlang.cookie檔案

  修改三臺裝置的.erlang.cookie中的key一致。如果使用的是前邊的安裝方法,.erlang.cookie的位置為 /var/lib/rabbitmq/.erlang.cookie (採用其他安裝方式找不到檔案的話,可以使用命令  find / -name '.erlang.cookie' 找到檔案位置)。這裡三臺虛擬機器的key都採用192.168.70.129的key(值為CRRQPKHDXEEIUJUOGYKN),在另外兩臺裝置上 執行命令: vim /var/lib/rabbitmq/.erlang.cookie ,修改檔案內容為CRRQPKHDXEEIUJUOGYKN。在修改時如果遇到許可權問題,可執行命令 chmod 600 /var/lib/rabbitmq/.erlang.cookie 修改檔案的許可權為可寫,修改內容完成後,執行命令 chmod 400 /var/lib/rabbitmq/.erlang.cookie 把檔案再次改成只讀的。

  完成上邊的兩步後,erlang叢集就搭建好了,重啟所有的裝置即可。在rabbitmq1節點的虛擬機器上執行命令 rabbitmqctl cluster_status 檢視叢集狀態:

 第3步:新增/刪除節點

新增節點

  把rabbitmq2節點新增到叢集中去,在rabbitmq2節點執行以下命令:

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app

  執行完成後,檢視叢集狀態,看到rabbitmq2已經在叢集中了,如下:

  重複上邊的步驟,把rabbitmq3也新增到叢集中,在rabbitmq3節點執行下邊命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit2
rabbitmqctl start_app
  檢視叢集狀態,rabbitmq3也在叢集中了,如下:  

  這時我們開啟任意一個節點的Web管理介面,顯示如下,看到叢集已經配置完成了:

刪除節點
  把某一節點從叢集中刪除很簡單,reset一下節點即可。如刪除rabbitmq3節點,在rabbitmq3上執行以下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

  執行完成後,檢視叢集狀態如下:

   現在搭建的叢集是預設的普通叢集,普通叢集中節點可以共享叢集中的exchange,routingKey和queue,但是queue中的訊息只儲存在首次宣告queue的節點中。任意節點的消費者都可以消費其他節點的訊息,比如消費者連線rabbitmq1節點的消費者(程式碼中建立Connection時,使用的rabbitmq1的IP)可以消費節點rabbitmq2的佇列myqueue2中的訊息,訊息傳輸過程是:rabbitmq2把myqueue2中的訊息傳輸給rabbtimq1,然後rabbitmq1節點把訊息傳送給consumer。因為queue中的訊息只儲存在首次宣告queue的節點中,這樣就有一個問題:如果某一個node節點掛掉了,那麼只能等待該節點重新連線才能繼續處理該節點內的訊息(如果沒有設定持久化的話,節點掛掉後訊息會直接丟失)。如下圖,rabbitmq1節點掛掉後,myqueue佇列就down掉了,不能被訪問。

  針對上邊的問題,我們可能會想到:如果可以讓rabbitmq中的節點像redis叢集的節點一樣,每一個節點都儲存所有的訊息,比如讓rabbitmq1不僅僅儲存自己佇列myqueue的訊息,還儲存其他節點的佇列myqueue2和myqueue3中的訊息,rabbitmq2和rabbitmq3節點也一樣,這樣就不用擔心宕機的問題了。rabbitmq也提供了這樣的功能:映象佇列。映象佇列由一個master和多個slave組成,使用映象佇列訊息會自動在映象節點間同步,而不是在consumer取資料時臨時拉取。

第4步:配置映象佇列

  rabbitmq配置映象佇列十分簡單,我們在任意一個node節點下執行下邊的命令就可以完成映象佇列的配置(當然也可以在Web管理介面上新增policy):

rabbitmqctl set_policy ha-all "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# ha-all:為策略名稱;
# ^my:為匹配符,只有一個^代表匹配所有,^abc為匹配名稱以abc開頭的queue或exchange;
# ha-mode:為同步模式,一共3種模式:
#    ①all-所有(所有的節點都同步訊息),
#    ②exctly-指定節點的數目(需配置ha-params引數,此引數為int型別比如2,在叢集中隨機抽取2個節點同步訊息)
#    ③nodes-指定具體節點(需配置ha-params引數,此引數為陣列型別比如["rabbit@rabbitmq1","rabbit@rabbitmq2"],明確指定在這兩個節點上同步訊息)。

  開啟Web管理介面,如果效果如下表示映象佇列已經配置完成了。當前myqueue的master節點為rabbitmq1:

  如果首次宣告queue的節點(master)掛了,其他節點會自動變成master,如上圖myqueue的master為rabbitmq1,停掉rabbtmq1後,結果如下:rabbitmq2成為了master。

 

  我們發現rabbitmq1節點掛了後,rabbitmq2自動成為了myqueue的master,myqueue不會down掉,可以正常的新增/刪除/獲取訊息,這就解決了普通叢集宕機的問題。使用映象佇列,因為各個節點要同步訊息,所以比較耗費資源,一般在可靠性比較高的場景使用映象佇列。

還可以配置其他策略的映象佇列,也是一行命令即可完成配置,一些其它同步模式的栗子:

#策略名為ha-twe,匹配以“my”開頭的queue或exchange,在叢集中隨機挑選映象節點,同步的節點為2個
rabbitmqctl set_policy ha-two "^my"    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
#策略名為ha-nodes,匹配以“my”開頭的queue或exchange,指定rabbit@rabbitmq2和rabbit@rabbitmq3為同步節點
rabbitmqctl set_policy ha-nodes "^my" \ '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq2", "rabbit@rabbitmq3"]}'

第5步:C#驅動RabbitMQ叢集

  C#驅動RabbitMQ叢集與C#驅動單機RabbtiMQ的方式基本一樣,區別在於使用叢集時,建立Connection指定的是一個host集合。看一個簡單的栗子:

生產者程式碼:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                UserName = "wyy",//使用者名稱
                Password = "123456",//密碼
                AutomaticRecoveryEnabled = true,//Connection斷了,自動重新連線
            };
            //叢集中的三個rabbitmq節點
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" };
            //隨機連線一個rabbitmq節點
            using (var connection = factory.CreateConnection(hosts))
            {
                //建立通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生產者準備就緒....");
                    #region 釋出100條訊息
                    for (int i = 0; i < 100; i++)
                    {
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: Encoding.UTF8.GetBytes($"第{i}條訊息"));
                    }
                    #endregion
                }
            }
            Console.ReadKey();
        }

消費者程式碼:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                UserName = "wyy",//使用者名稱
                Password = "123456",//密碼
            };
            //叢集中的三個rabbitmq節點
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" };
            //隨機連線一個rabbitmq節點
            using (var connection = factory.CreateConnection(hosts))
            {
                using (var channel = connection.CreateModel())
                {
                    //使用Qos,每次接收1條訊息
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);
                    Console.WriteLine("消費者準備就緒....");
                    #region EventingBasicConsumer

                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (sender, ea) =>
                    {
                        //處理一條訊息需要10s時間
                        Thread.Sleep(1000);
                        //顯示確認訊息
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        Console.WriteLine($"處理訊息【{Encoding.UTF8.GetString(ea.Body)}】完成!");
                    };
                    //處理訊息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

   執行這兩個應用程式,結果如下:

 

   到這裡RabbtMQ的叢集搭建就告一段落了,有一個小問題:RabbitMQ的叢集預設不支援負載均衡的。我們可以根據裝置的效能,使用Qos給各個消費者指定合適的最大發送條數,這樣可以在一定程度上實現負載均衡。也有園友通過Haproxy實現RabbitMQ叢集的負載均衡,有興趣的小夥伴可以研究一下,為什麼使用Haprpxy而不用Ngnix呢?這是因為Haproxy支援四層(tcp,udp,http等)和七層(http,https,email等)的負載均衡,而Nginx只支援七層的負載均衡,而Rabbitmq是通過tcp傳輸的。本節也是RabbitMQ系列的最後一篇,如果文中有錯誤的話,希望大家可以指出,我會及時修改,謝謝。