11.RabbitMQ單機集群
RabbitMQ集群設計用於完成兩個目標:允許消費者和生產者在RabbitMQ節點崩潰的情況下繼續運行,以及通過添加更多的節點來擴展消息通信的吞吐量。
RabbitMQ會始終記錄以下四種類型的內部元數據:
1. 隊列元數據-隊列的名稱和它們的屬性(是否持久化,是否自動刪除)
2. 交換器元數據-交換器類型、名稱和屬性(可持久化等)
3. 綁定元數據-一張簡單的表格展示了如何將消息路由到隊列
4. vhost元數據-為vhost內的隊列、交換器和綁定提供命名空間和安全屬性
在單一節點內,RabbitMQ會將所有這些信息存儲在內存中,同時將那些標記為可持久化的隊列和交換器(以及它們的綁定)存儲到硬盤上。當你引入集群時,RabbitMQ需要追蹤新的元數據類型:集群節點位置,以及節點與已記錄的其他類型元數據的關系。集群提供了選擇:將元數據存儲到磁盤上,或者存儲在內存中。
Erlang Cookie
Erlang Cookie是保證不同節點可以相互通信的密鑰,要保證集群中的不同節點相互通信必須共享相同的Erlang Cookie。具體的目錄存放在/var/lib/rabbitmq/.erlang.cookie。
說明: 這就要從rabbitmqctl命令的工作原理說起,RabbitMQ底層是通過Erlang架構來實現的,所以rabbitmqctl會啟動Erlang節點,並基於Erlang節點來使用Erlang系統連接RabbitMQ節點,在連接過程中需要正確的Erlang Cookie和節點名稱,Erlang節點通過交換Erlang Cookie以獲得認證。
鏡像隊列
功能和原理
RabbitMQ的Cluster集群模式一般分為兩種,普通模式和鏡像模式。
-
普通模式:默認的集群模式,以兩個節點(rabbit01、rabbit02)為例來進行說明。對於Queue來說,消息實體只存在於其中一個節點rabbit01(或者rabbit02),rabbit01和rabbit02兩個節點僅有相同的元數據,即隊列的結構。當消息進入rabbit01節點的Queue後,consumer從rabbit02節點消費時,RabbitMQ會臨時在rabbit01、rabbit02間進行消息傳輸,把A中的消息實體取出並經過B發送給consumer。所以consumer應盡量連接每一個節點,從中取消息。即對於同一個邏輯隊列,要在多個節點建立物理Queue。否則無論consumer連rabbit01或rabbit02,出口總在rabbit01,會產生瓶頸。當rabbit01節點故障後,rabbit02節點無法取到rabbit01節點中還未消費的消息實體。如果做了消息持久化,那麽得等rabbit01節點恢復,然後才可被消費;如果沒有持久化的話,就會產生消息丟失的現象。
-
鏡像模式:將需要消費的隊列變為鏡像隊列,存在於多個節點,這樣就可以實現RabbitMQ的HA高可用性。作用就是消息實體會主動在鏡像節點之間實現同步,而不是像普通模式那樣,在consumer消費數據時臨時讀取。缺點就是,集群內部的同步通訊會占用大量的網絡帶寬。
內存節點和磁盤節點
每個RabbitMQ節點,要麽是內存節點(ram node),要麽是磁盤節點(disk node)。內存節點將所有的隊列、交換器、綁定、用戶、權限和vhost的元數據定義都僅存在內存中。而磁盤節點則將元數據存儲在磁盤中。
內存節點的效率更高,內存節點唯一存儲到磁盤上的是磁盤節點的地址。
RabbitMQ要求集群中至少有一個磁盤節點。當節點加入或者離開集群時,它們必須要將該變更通知到至少一個磁盤節點。如果只有一個磁盤節點,而且不湊巧的是它又崩潰了,那麽集群可以繼續路由消息,但是不能做以下操作了:
1. 創建隊列
2. 創建交換器
3. 創建綁定
4. 添加用戶
5. 更改權限
單機環境搭建多節點群集
1、禁用管理後臺插件rabbitmq-plugins disable rabbitmq_management
2、創建三個Shell文件
rabbitmq1.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5672
export RABBITMQ_NODENAME=rabbit
rabbitmq-server
rabbitmq2.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5673
export RABBITMQ_NODENAME=rabbit2
rabbitmq-server
rabbitmq3.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5674
export RABBITMQ_NODENAME=rabbit3
rabbitmq-server
3、停止在Erlang節點上運行的節點2和節點3 RabbitMQ Server 並清空(重置)它們的元數據
rabbitmqctl -n rabbit1@localhost stop_app
rabbitmqctl -n rabbit2@localhost stop_app
rabbitmqctl -n rabbit1@localhost reset
rabbitmqctl -n rabbit2@localhost reset
4、將節點2作為磁盤節點加入集群並啟動應用
rabbitmqctl -n rabbit1@localhost join_cluster rabbit@localhost
rabbitmqctl -n rabbit1@localhost start_app
5、將節點3作為內存節點加入集群並啟動應用
rabbitmqctl -n rabbit2@localhost join_cluster --ram rabbit@localhost
rabbitmqctl -n rabbit2@localhost start_app
6、運行命令rabbitmqctl cluster_status查看集群狀態
Cluster status of node rabbit@localhost ...
[{nodes,[{disc,[rabbit1@localhost,rabbit@localhost]},
{ram,[rabbit2@localhost]}]},
{running_nodes,[rabbit2@localhost,rabbit1@localhost,rabbit@localhost]},
{cluster_name,<<"rabbit@localhost">>},
{partitions,[]},
{alarms,[{rabbit2@localhost,[]},
{rabbit1@localhost,[]},
{rabbit@localhost,[]}]}]
集群安裝成功,這時候java客戶端可以連接任何一個RabbitMQ Server的端口來訪問集群了。
7、鏡像隊列
在聲明隊列時,可以通過參數"x-ha-policy"設置為"all"來把消息發送到集群的所有節點上。
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
客戶端發送代碼
package com.test.cluster;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//使用默認端口連接MQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默認端口5672
Connection conn = factory.newConnection(); //聲明一個連接
Channel channel = conn.createChannel(); //聲明消息通道
String exchangeName = "TestEXG";//交換機名稱
String routingKey = "RouteKey1";//RoutingKey關鍵字
channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
String queueName = "ClusterQueue";//隊列名稱
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象
byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發布消息
//關閉通道和連接
channel.close();
conn.close();
}
}
消費者代碼
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//通過channel.basicAck向服務器發送回執,刪除服務上的消息
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默認端口5672
factory.setPort(5672);
Connection conn = factory.newConnection(); //聲明一個連接
Channel channel = conn.createChannel(); //聲明消息通道
String exchangeName = "TestEXG";//交換機名稱
String queueName = "ClusterQueue";//隊列名稱
channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機
channel.queueBind(queueName, exchangeName, "RouteKey1");
channel.basicQos(1); //server push消息時的隊列長度
//用來緩存服務器推送過來的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("Received " + new String(delivery.getBody()));
//回復ack包,如果不回復,消息不會在服務器刪除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
11.RabbitMQ單機集群