1. 程式人生 > >12.RabbitMQ多機集群

12.RabbitMQ多機集群

cal 刪除 exc () 虛擬主機 all 默認端口 mar iss

配置兩臺Linux CentOS 6.7虛擬主機

CentOS6.7下載地址 https://pan.baidu.com/s/1i5GPg9n 安裝視頻下載

https://pan.baidu.com/s/1qYSgohQ

rabbitmq2 技術分享圖片 rabbitmq1 技術分享圖片


1、分別在兩臺主機上修改/etc/hosts 192.168.169.100 rabbitmq1 192.168.169.110 rabbitmq2 2、從客戶端上傳RPM包 技術分享圖片
RPM包下載地址 https://pan.baidu.com/s/1dE1iaGx

3、下載阿裏雲Yum源

#wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repo

依次安裝Erlang,Rabbitmq #yum -y install openssl #yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm #yum -y install erlang-17.4-1.el6.x86_64.rpm #yum -y install rabbitmq-server-3.6.3-1.noarch.rpm
4、啟動rabbitmq1,rabbitmq2上的RabbitMQ rabbitmq1 #service rabbitmq-server start rabbitmq2 #service rabbitmq-server start

5、從rabbitmq1主機上拷貝文件到rabbitmq2 scp /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq 6、在rabbitmq1,rabbitmq2上分別關閉防火墻 [root@rabbitmq1 ~]# service iptables stop [root@rabbitmq2 ~]# service iptables stop
7、在rabbitmq1,rabbitmq2上分別啟動RibbitMQ [root@rabbitmq1 ~]# service rabbitmq-server start
[root@rabbitmq2 ~]# service rabbitmq-server start 8、在rabbitmq2上執行 rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbitmq1 rabbitmqctl start_app 9、查看各節點上的狀態 rabbitmqctl cluster_status 10、在rabbitmq1,rabbitmq2節點上分別添加用戶和設置控制臺插件 [root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management [root@rabbitmq1 ~]# rabbitmqctl add_user admin admin [root@rabbitmq1 ~]# rabbitmqctl set_permissions admin ".*" ".*" ".*" [root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator [root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management

11、在rabbitmq1節點上安裝haproxy

yum -y install haproxy 12、配置haproxy cp /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
vi /etc/haproxy/haproxy.cfg 添加配置信息 技術分享圖片

listen rabbitmq_local_cluster 192.168.169.100:5670 mode tcp balance roundrobin server rabbit 192.168.169.100:5672 check inter 5000 rise 2 fall 3 server rabbit 192.168.169.110:5672 check inter 5000 rise 2 fall 3
listen private_monitoring :8100 mode http option httplog stats enable stats uri /stats stats refresh 60s

13、啟動haproxy

service haproxy start
14、查看haproxy控制臺 http://192.168.169.142:8100/stats 技術分享圖片
15、建立RabbitMQ策略 技術分享圖片

16、建立持久隊列 技術分享圖片
測試代碼 Producer.java package com.test.cluster; import com.rabbitmq.client.*; import java.io.IOException; import java.lang.String; import java.lang.System; import java.util.HashMap; import java.util.Map; import java.util.Scanner; public class Producer { public static void main(String[] args) throws Exception { //使用默認端口連接MQ ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.100"); //使用默認端口5672 factory.setPort(5670); Connection conn = factory.newConnection(); //聲明一個連接 Channel channel = conn.createChannel(); //聲明消息通道 String exchangeName = "TestEXG";//交換機名稱 String routingKey = "RouteKey1";//RoutingKey關鍵字 channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機 String queueName = "ClusterQueue";//隊列名稱 Map arg = new HashMap(); arg.put("x-ha-policy", "all"); channel.queueDeclare(queueName, false, false, false, arg); channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象 byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容 channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發布消息 //關閉通道和連接 channel.close(); conn.close(); } } Customer.java package com.test.cluster; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; import java.util.HashMap; import java.util.Map; //通過channel.basicAck向服務器發送回執,刪除服務上的消息 public class Consumer { public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("admin"); factory.setPassword("admin"); factory.setHost("192.168.169.100"); //使用默認端口5672 factory.setPort(5670); Connection conn = factory.newConnection(); //聲明一個連接 Channel channel = conn.createChannel(); //聲明消息通道 String exchangeName = "TestEXG";//交換機名稱 String queueName = "ClusterQueue";//隊列名稱 channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機 channel.queueBind(queueName, exchangeName, "RouteKey1"); channel.basicQos(1); //server push消息時的隊列長度 //用來緩存服務器推送過來的消息 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); System.out.println("Received " + new String(delivery.getBody())); //回復ack包,如果不回復,消息不會在服務器刪除 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } 關閉掉其中一個RabbitMQ,測試群集效果

12.RabbitMQ多機集群