1. 程式人生 > >11.RabbitMQ單機集群

11.RabbitMQ單機集群

policy 要求 重置 建立 多節點 應用 消費者 架構 top

RabbitMQ集群設計用於完成兩個目標:允許消費者和生產者在RabbitMQ節點崩潰的情況下繼續運行,以及通過添加更多的節點來擴展消息通信的吞吐量。

RabbitMQ會始終記錄以下四種類型的內部元數據:

1. 隊列元數據-隊列的名稱和它們的屬性(是否持久化,是否自動刪除)

2. 交換器元數據-交換器類型、名稱和屬性(可持久化等)

3. 綁定元數據-一張簡單的表格展示了如何將消息路由到隊列

4. vhost元數據-為vhost內的隊列、交換器和綁定提供命名空間和安全屬性

在單一節點內,RabbitMQ會將所有這些信息存儲在內存中,同時將那些標記為可持久化的隊列和交換器(以及它們的綁定)存儲到硬盤上。當你引入集群時,RabbitMQ需要追蹤新的元數據類型:集群節點位置,以及節點與已記錄的其他類型元數據的關系。集群提供了選擇:將元數據存儲到磁盤上,或者存儲在內存中。

Erlang Cookie

Erlang Cookie是保證不同節點可以相互通信的密鑰,要保證集群中的不同節點相互通信必須共享相同的Erlang Cookie。具體的目錄存放在/var/lib/rabbitmq/.erlang.cookie。

說明: 這就要從rabbitmqctl命令的工作原理說起,RabbitMQ底層是通過Erlang架構來實現的,所以rabbitmqctl會啟動Erlang節點,並基於Erlang節點來使用Erlang系統連接RabbitMQ節點,在連接過程中需要正確的Erlang Cookie和節點名稱,Erlang節點通過交換Erlang Cookie以獲得認證。

鏡像隊列

功能和原理
RabbitMQ的Cluster集群模式一般分為兩種,普通模式和鏡像模式。

  • 普通模式:默認的集群模式,以兩個節點(rabbit01、rabbit02)為例來進行說明。對於Queue來說,消息實體只存在於其中一個節點rabbit01(或者rabbit02),rabbit01和rabbit02兩個節點僅有相同的元數據,即隊列的結構。當消息進入rabbit01節點的Queue後,consumer從rabbit02節點消費時,RabbitMQ會臨時在rabbit01、rabbit02間進行消息傳輸,把A中的消息實體取出並經過B發送給consumer。所以consumer應盡量連接每一個節點,從中取消息。即對於同一個邏輯隊列,要在多個節點建立物理Queue。否則無論consumer連rabbit01或rabbit02,出口總在rabbit01,會產生瓶頸。當rabbit01節點故障後,rabbit02節點無法取到rabbit01節點中還未消費的消息實體。如果做了消息持久化,那麽得等rabbit01節點恢復,然後才可被消費;如果沒有持久化的話,就會產生消息丟失的現象。

  • 鏡像模式:將需要消費的隊列變為鏡像隊列,存在於多個節點,這樣就可以實現RabbitMQ的HA高可用性。作用就是消息實體會主動在鏡像節點之間實現同步,而不是像普通模式那樣,在consumer消費數據時臨時讀取。缺點就是,集群內部的同步通訊會占用大量的網絡帶寬。

內存節點和磁盤節點

每個RabbitMQ節點,要麽是內存節點(ram node),要麽是磁盤節點(disk node)。內存節點將所有的隊列、交換器、綁定、用戶、權限和vhost的元數據定義都僅存在內存中。而磁盤節點則將元數據存儲在磁盤中。

內存節點的效率更高,內存節點唯一存儲到磁盤上的是磁盤節點的地址。

RabbitMQ要求集群中至少有一個磁盤節點。當節點加入或者離開集群時,它們必須要將該變更通知到至少一個磁盤節點。如果只有一個磁盤節點,而且不湊巧的是它又崩潰了,那麽集群可以繼續路由消息,但是不能做以下操作了:

1. 創建隊列

2. 創建交換器

3. 創建綁定

4. 添加用戶

5. 更改權限

單機環境搭建多節點群集

1、禁用管理後臺插件rabbitmq-plugins disable rabbitmq_management

2、創建三個Shell文件


rabbitmq1.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5672

export RABBITMQ_NODENAME=rabbit

rabbitmq-server

rabbitmq2.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5673

export RABBITMQ_NODENAME=rabbit2

rabbitmq-server

rabbitmq3.sh

#!/bin/bash

export RABBITMQ_NODE_PORT=5674

export RABBITMQ_NODENAME=rabbit3

rabbitmq-server

3、停止在Erlang節點上運行的節點2和節點3 RabbitMQ Server 並清空(重置)它們的元數據

rabbitmqctl -n rabbit1@localhost stop_app

rabbitmqctl -n rabbit2@localhost stop_app

rabbitmqctl -n rabbit1@localhost reset

rabbitmqctl -n rabbit2@localhost reset

4、將節點2作為磁盤節點加入集群並啟動應用

rabbitmqctl -n rabbit1@localhost join_cluster rabbit@localhost

rabbitmqctl -n rabbit1@localhost start_app

5、將節點3作為內存節點加入集群並啟動應用

rabbitmqctl -n rabbit2@localhost join_cluster --ram rabbit@localhost

rabbitmqctl -n rabbit2@localhost start_app

6、運行命令rabbitmqctl cluster_status查看集群狀態

Cluster status of node rabbit@localhost ...

[{nodes,[{disc,[rabbit1@localhost,rabbit@localhost]},

{ram,[rabbit2@localhost]}]},

{running_nodes,[rabbit2@localhost,rabbit1@localhost,rabbit@localhost]},

{cluster_name,<<"rabbit@localhost">>},

{partitions,[]},

{alarms,[{rabbit2@localhost,[]},

{rabbit1@localhost,[]},

{rabbit@localhost,[]}]}]

集群安裝成功,這時候java客戶端可以連接任何一個RabbitMQ Server的端口來訪問集群了。

7、鏡像隊列

在聲明隊列時,可以通過參數"x-ha-policy"設置為"all"來把消息發送到集群的所有節點上。

Map arg = new HashMap();

arg.put("x-ha-policy", "all");

channel.queueDeclare(queueName, false, false, false, arg);

客戶端發送代碼

package com.test.cluster;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.lang.String;

import java.lang.System;

import java.util.HashMap;

import java.util.Map;

import java.util.Scanner;

public class Producer {

public static void main(String[] args) throws Exception {

//使用默認端口連接MQ

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("admin");

factory.setHost("192.168.169.142"); //使用默認端口5672

Connection conn = factory.newConnection(); //聲明一個連接

Channel channel = conn.createChannel(); //聲明消息通道

String exchangeName = "TestEXG";//交換機名稱

String routingKey = "RouteKey1";//RoutingKey關鍵字

channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機

String queueName = "ClusterQueue";//隊列名稱

Map arg = new HashMap();

arg.put("x-ha-policy", "all");

channel.queueDeclare(queueName, false, false, false, arg);

channel.queueBind(queueName, exchangeName, routingKey);//定義聲明對象

byte[] messageBodyBytes = "Hello, world!".getBytes();//消息內容

channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//發布消息

//關閉通道和連接

channel.close();

conn.close();

}

}

消費者代碼

package com.test.cluster;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

//通過channel.basicAck向服務器發送回執,刪除服務上的消息

public class Consumer {

public static void main(String[] args) throws IOException, InterruptedException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("admin");

factory.setHost("192.168.169.142"); //使用默認端口5672

factory.setPort(5672);

Connection conn = factory.newConnection(); //聲明一個連接

Channel channel = conn.createChannel(); //聲明消息通道

String exchangeName = "TestEXG";//交換機名稱

String queueName = "ClusterQueue";//隊列名稱

channel.exchangeDeclare(exchangeName, "direct", true);//定義聲明交換機

channel.queueBind(queueName, exchangeName, "RouteKey1");

channel.basicQos(1); //server push消息時的隊列長度

//用來緩存服務器推送過來的消息

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(queueName, false, consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

System.out.println("Received " + new String(delivery.getBody()));

//回復ack包,如果不回復,消息不會在服務器刪除

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

11.RabbitMQ單機集群