12.RabbitMQ多機集群
阿新 • • 發佈:2018-01-01
cal 刪除 exc () 虛擬主機 all 默認端口 mar iss
1、分別在兩臺主機上修改/etc/hosts 192.168.169.100 rabbitmq1 192.168.169.110 rabbitmq2 2、從客戶端上傳RPM包
RPM包下載地址 https://pan.baidu.com/s/1dE1iaGx
依次安裝Erlang,Rabbitmq
#yum
-y install openssl
#yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm
#yum -y install erlang-17.4-1.el6.x86_64.rpm
#yum -y install rabbitmq-server-3.6.3-1.noarch.rpm
4、啟動rabbitmq1,rabbitmq2上的RabbitMQ rabbitmq1 #service rabbitmq-server start rabbitmq2 #service rabbitmq-server start
5、從rabbitmq1主機上拷貝文件到rabbitmq2 scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq 6、在rabbitmq1,rabbitmq2上分別關閉防火墻 [root@rabbitmq1 ~]# service iptables stop [root@rabbitmq2 ~]# service iptables stop
7、在rabbitmq1,rabbitmq2上分別啟動RibbitMQ [root@rabbitmq1 ~]# service rabbitmq-server start
[root@rabbitmq2 ~]# service rabbitmq-server
start
8、在rabbitmq2上執行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
9、查看各節點上的狀態
rabbitmqctl cluster_status
10、在rabbitmq1,rabbitmq2節點上分別添加用戶和設置控制臺插件
[root@rabbitmq1 ~]# rabbitmq-plugins
enable rabbitmq_management
[root@rabbitmq1 ~]# rabbitmqctl add_user
admin admin
[root@rabbitmq1 ~]# rabbitmqctl
set_permissions admin ".*" ".*" ".*"
[root@rabbitmq1 ~]# rabbitmqctl
set_user_tags admin administrator
[root@rabbitmq2 ~]# rabbitmq-plugins
enable rabbitmq_management
vi /etc/haproxy/haproxy.cfg 添加配置信息
listen rabbitmq_local_cluster 192.168.169.100:5670 mode tcp balance roundrobin server rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall 3 server rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall 3
listen private_monitoring :8100 mode http option httplog stats enable stats uri /stats stats refresh 60s
14、查看haproxy控制臺 http://192.168.169.142:8100/stats
15、建立RabbitMQ策略
16、建立持久隊列
測試代碼 Producer.java package com.test.cluster; import com.rabbitmq.client.*; import java.io.IOException; import java.lang.String; import java.lang.System; import java.util.HashMap; import java.util.Map; import java.util.Scanner; public class Producer { public static void main(String[] args) throws Exception { //使用默認端口連接MQ ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.100"); //使用默認端口5672 factory.setPort(5670); Connection conn = factory.newConnection(); //聲明一個連接 Channel channel = conn.createChannel(); //聲明消息通道 String exchangeName = "TestEXG";//交換機名稱 String routingKey = "RouteKey1";//RoutingKey關鍵字 channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機 String queueName = "ClusterQueue";//隊列名稱 Map arg = new HashMap(); arg.put("x-ha-policy", "all"); channel.queueDeclare(queueName, false, false, false, arg); channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象 byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容 channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發布消息 //關閉通道和連接 channel.close(); conn.close(); } } Customer.java package com.test.cluster; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.util.HashMap; import java.util.Map; //通過channel.basicAck向服務器發送回執,刪除服務上的消息 public class Consumer { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.100"); //使用默認端口5672 factory.setPort(5670); Connection conn = factory.newConnection(); //聲明一個連接 Channel channel = conn.createChannel(); //聲明消息通道 String exchangeName = "TestEXG";//交換機名稱 String queueName = "ClusterQueue";//隊列名稱 channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機 channel.queueBind(queueName, exchangeName, "RouteKey1"); channel.basicQos(1); //server push消息時的隊列長度 //用來緩存服務器推送過來的消息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println("Received " + new String(delivery.getBody())); //回復ack包,如果不回復,消息不會在服務器刪除 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } 關閉掉其中一個RabbitMQ,測試群集效果
配置兩臺Linux CentOS 6.7虛擬主機
CentOS6.7下載地址 https://pan.baidu.com/s/1i5GPg9n 安裝視頻下載https://pan.baidu.com/s/1qYSgohQ
1、分別在兩臺主機上修改/etc/hosts 192.168.169.100 rabbitmq1 192.168.169.110 rabbitmq2 2、從客戶端上傳RPM包
RPM包下載地址 https://pan.baidu.com/s/1dE1iaGx
3、下載阿裏雲Yum源
#wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo
4、啟動rabbitmq1,rabbitmq2上的RabbitMQ rabbitmq1 #service rabbitmq-server start rabbitmq2 #service rabbitmq-server start
5、從rabbitmq1主機上拷貝文件到rabbitmq2 scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq 6、在rabbitmq1,rabbitmq2上分別關閉防火墻 [root@rabbitmq1 ~]# service iptables stop [root@rabbitmq2 ~]# service iptables stop
7、在rabbitmq1,rabbitmq2上分別啟動RibbitMQ [root@rabbitmq1 ~]# service rabbitmq-server start
11、在rabbitmq1節點上安裝haproxy
yum -y install haproxy 12、配置haproxy cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bakvi /etc/haproxy/haproxy.cfg 添加配置信息
listen rabbitmq_local_cluster 192.168.169.100:5670 mode tcp balance roundrobin server rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall 3 server rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall 3
listen private_monitoring :8100 mode http option httplog stats enable stats uri /stats stats refresh 60s
13、啟動haproxy
service haproxy start14、查看haproxy控制臺 http://192.168.169.142:8100/stats
15、建立RabbitMQ策略
16、建立持久隊列
測試代碼 Producer.java package com.test.cluster; import com.rabbitmq.client.*; import java.io.IOException; import java.lang.String; import java.lang.System; import java.util.HashMap; import java.util.Map; import java.util.Scanner; public class Producer { public static void main(String[] args) throws Exception { //使用默認端口連接MQ ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.100"); //使用默認端口5672 factory.setPort(5670); Connection conn = factory.newConnection(); //聲明一個連接 Channel channel = conn.createChannel(); //聲明消息通道 String exchangeName = "TestEXG";//交換機名稱 String routingKey = "RouteKey1";//RoutingKey關鍵字 channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機 String queueName = "ClusterQueue";//隊列名稱 Map arg = new HashMap(); arg.put("x-ha-policy", "all"); channel.queueDeclare(queueName, false, false, false, arg); channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象 byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容 channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發布消息 //關閉通道和連接 channel.close(); conn.close(); } } Customer.java package com.test.cluster; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.util.HashMap; import java.util.Map; //通過channel.basicAck向服務器發送回執,刪除服務上的消息 public class Consumer { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.100"); //使用默認端口5672 factory.setPort(5670); Connection conn = factory.newConnection(); //聲明一個連接 Channel channel = conn.createChannel(); //聲明消息通道 String exchangeName = "TestEXG";//交換機名稱 String queueName = "ClusterQueue";//隊列名稱 channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機 channel.queueBind(queueName, exchangeName, "RouteKey1"); channel.basicQos(1); //server push消息時的隊列長度 //用來緩存服務器推送過來的消息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println("Received " + new String(delivery.getBody())); //回復ack包,如果不回復,消息不會在服務器刪除 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } 關閉掉其中一個RabbitMQ,測試群集效果
12.RabbitMQ多機集群