1. 程式人生 > 實用技巧 >RabbitMQ (十四) 普通叢集

RabbitMQ (十四) 普通叢集

上篇文章把單機叢集搭建好了,可以開始驗證普通叢集的相關功能了.

我們首先在管理後臺(15672,15673 都可以)新增一個使用者,並用新使用者登入,新增一個虛擬主機

由於是在一臺機器上模擬叢集,所以我們把建立連線的工具類小改一下,將埠號作為入參.

    public static class ConnectionHelper
    {
        public static IConnection GetConnection(int port)
        {
            //定義一個連線工廠
            ConnectionFactory factory = new ConnectionFactory
            {  
                HostName = "127.0.0.1",//設定伺服器地址
                Port = port,  //設定埠號
                VirtualHost = "/vhost_wjire",//設定虛擬主機
                UserName = "wjire",//設定使用者名稱
                Password = "******"//設定密碼
            };
            //factory.AutomaticRecoveryEnabled  //自動恢復連線,預設就是true
            //factory.NetworkRecoveryInterval //自動恢復連線失敗,預設每 5 秒重試一次

            //連線恢復後才會進行拓撲恢復
            //factory.TopologyRecoveryEnabled //預設也是true
            return factory.CreateConnection();
        }
    }

一.非持久化佇列驗證

1.生產者連線到 node1 (5672) 宣告佇列,傳送訊息,消費者連線到 node2 (5673) 接收訊息.

生產者

    public class Producer
    {
        private const string QueueName = "test_queue";
        public static void Send()
        {
            using (IConnection connection = ConnectionHelper.GetConnection(5672))
            using (IModel channel = connection.CreateModel())
            {
                var msg = "hello world";
                channel.QueueDeclare(QueueName, false, false, false, null);
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                Console.WriteLine($"send {msg}");
            }
        }
    }

消費者

    public class Consumer
    {
        private const string QueueName = "test_queue";
        public static void Receive()
        {
            IConnection connection = ConnectionHelper.GetConnection(5673);
            IModel channel = connection.CreateModel();
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Console.WriteLine("consumer receive : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: true, consumer: consumer);
        }
    }

執行結果如下,並且在管理後臺我們可以看到該佇列的節點.

2.生產者連線到 node2 ,將訊息傳送到上面程式碼在 node1 宣告的佇列,而消費者則連線到 node2 接收訊息.

測試結果一切正常,就不上圖了.

二.持久化佇列及訊息驗證

生產者部分程式碼

                channel.QueueDeclare(QueueName, true, false, false, null);//佇列持久化
                var pros = channel.CreateBasicProperties();
                pros.Persistent = true;//將訊息設定為持久化
                channel.BasicPublish("", QueueName, pros, Encoding.Default.GetBytes(msg));

訊息傳送後

現在,我們關閉 node1 的RabbitMQ.然後再看管理後臺,當然,15672 肯定訪問不了,只能訪問 15673

下面,我們嘗試讓生產者在 node2 也就是連線到 5673 重新宣告一個叫 "test_queue"的佇列.

結果直接異常了,下面是異常的部分截圖:

現在我們重新啟動 node1 ,過了幾秒後,佇列恢復到了 node1 關閉之前的狀態.

三.叢集節點介紹

RabbitMQ叢集中的節點分記憶體節點(RAM)和磁碟節點(disc).

  • 記憶體節點:將所有的佇列,交換機,繫結,使用者,許可權,vhost的元資料都儲存在記憶體中;
  • 磁碟節點:將資料存放在磁碟上.磁碟節點需要儲存叢集的配置資訊

如果傳送的是持久化訊息,那麼即使是記憶體節點,資料還是會放在磁碟中.記憶體節點的效能只能體現在資源管理上,比如增加或刪除佇列,虛擬主機,交換機等,但傳送和接受訊息速度同磁碟節點一樣.

一個叢集至少有一個節點是磁碟節點,其他節點可以都是記憶體節點,當節點加入或者離開叢集時都要將變更通知到至少一個磁碟節點.實際使用時至少要兩個磁碟節點,原因很簡單,如果只有一個磁碟節點,恰巧磁碟節點掛了,那麼RabbitMQ將不能建立佇列,建立交換機,建立繫結,新增使用者,更改許可權,新增或刪除節點等操作,但是可以正常的釋出和消費訊息. 在實際使用中必須將叢集的配置放到磁碟節點上來儲存.

一個叢集中的節點可以共享user,vhost,exchange等,所有的資料和狀態都會在所有節點上覆制.在叢集模式下只要有任何一個節點可以工作,RabbitMQ叢集對外就能提供服務.

單機叢集只允許磁碟節點,否則每次重啟所有資料將會丟失.