RabbitMQ (十四) 普通叢集
上篇文章把單機叢集搭建好了,可以開始驗證普通叢集的相關功能了.
我們首先在管理後臺(15672,15673 都可以)新增一個使用者,並用新使用者登入,新增一個虛擬主機
由於是在一臺機器上模擬叢集,所以我們把建立連線的工具類小改一下,將埠號作為入參.
public static class ConnectionHelper { public static IConnection GetConnection(int port) { //定義一個連線工廠 ConnectionFactory factory = new ConnectionFactory { HostName = "127.0.0.1",//設定伺服器地址 Port = port, //設定埠號 VirtualHost = "/vhost_wjire",//設定虛擬主機 UserName = "wjire",//設定使用者名稱 Password = "******"//設定密碼 }; //factory.AutomaticRecoveryEnabled //自動恢復連線,預設就是true //factory.NetworkRecoveryInterval //自動恢復連線失敗,預設每 5 秒重試一次 //連線恢復後才會進行拓撲恢復 //factory.TopologyRecoveryEnabled //預設也是true return factory.CreateConnection(); } }
一.非持久化佇列驗證
1.生產者連線到 node1 (5672) 宣告佇列,傳送訊息,消費者連線到 node2 (5673) 接收訊息.
生產者
public class Producer { private const string QueueName = "test_queue"; public static void Send() { using (IConnection connection = ConnectionHelper.GetConnection(5672)) using (IModel channel = connection.CreateModel()) { var msg = "hello world"; channel.QueueDeclare(QueueName, false, false, false, null); channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); } } }
消費者
public class Consumer { private const string QueueName = "test_queue"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(5673); IModel channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Console.WriteLine("consumer receive : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: true, consumer: consumer); } }
執行結果如下,並且在管理後臺我們可以看到該佇列的節點.
2.生產者連線到 node2 ,將訊息傳送到上面程式碼在 node1 宣告的佇列,而消費者則連線到 node2 接收訊息.
測試結果一切正常,就不上圖了.
二.持久化佇列及訊息驗證
生產者部分程式碼
channel.QueueDeclare(QueueName, true, false, false, null);//佇列持久化 var pros = channel.CreateBasicProperties(); pros.Persistent = true;//將訊息設定為持久化 channel.BasicPublish("", QueueName, pros, Encoding.Default.GetBytes(msg));
訊息傳送後
現在,我們關閉 node1 的RabbitMQ.然後再看管理後臺,當然,15672 肯定訪問不了,只能訪問 15673
下面,我們嘗試讓生產者在 node2 也就是連線到 5673 重新宣告一個叫 "test_queue"的佇列.
結果直接異常了,下面是異常的部分截圖:
現在我們重新啟動 node1 ,過了幾秒後,佇列恢復到了 node1 關閉之前的狀態.
三.叢集節點介紹
RabbitMQ叢集中的節點分記憶體節點(RAM)和磁碟節點(disc).
- 記憶體節點:將所有的佇列,交換機,繫結,使用者,許可權,vhost的元資料都儲存在記憶體中;
- 磁碟節點:將資料存放在磁碟上.磁碟節點需要儲存叢集的配置資訊
如果傳送的是持久化訊息,那麼即使是記憶體節點,資料還是會放在磁碟中.記憶體節點的效能只能體現在資源管理上,比如增加或刪除佇列,虛擬主機,交換機等,但傳送和接受訊息速度同磁碟節點一樣.
一個叢集至少有一個節點是磁碟節點,其他節點可以都是記憶體節點,當節點加入或者離開叢集時都要將變更通知到至少一個磁碟節點.實際使用時至少要兩個磁碟節點,原因很簡單,如果只有一個磁碟節點,恰巧磁碟節點掛了,那麼RabbitMQ將不能建立佇列,建立交換機,建立繫結,新增使用者,更改許可權,新增或刪除節點等操作,但是可以正常的釋出和消費訊息. 在實際使用中必須將叢集的配置放到磁碟節點上來儲存.
一個叢集中的節點可以共享user,vhost,exchange等,所有的資料和狀態都會在所有節點上覆制.在叢集模式下只要有任何一個節點可以工作,RabbitMQ叢集對外就能提供服務.
單機叢集只允許磁碟節點,否則每次重啟所有資料將會丟失.