1. 程式人生 > >Zookeeper+ActiveMQ叢集搭建

Zookeeper+ActiveMQ叢集搭建

1 前言

      使用ZooKeeper實現的Master-Slave實現方式,是對ActiveMQ進行高可用的一種有效的解決方案,高可用的原理:使用ZooKeeper(叢集)註冊所有的ActiveMQ Broker。只有其中的一個Broker可以對外提供服務(也就是Master節點),其 他的Broker處於待機狀態,被視為Slave。如果Master因故障而不能提供服務, 則利用ZooKeeper的內部選舉機制會從Slave中選舉出一個Broker充當Master節 點,繼續對外提供服務。

原理圖:

這裡寫圖片描述

2 部署方案,ZooKeeper、ActiveMQ叢集環境準備

這裡需要下載apache-activemq-5.11.1bin.tar.gz、zookeeper-3.3.6.tar.gz檔案,可以到官網去下載。

apache-activemq-5.11.1bin.tar.gz、zookeeper-3.3.6.tar.gz百度雲連結:連結:https://pan.baidu.com/s/1ge5htFP 密碼:lkb6

2、1 Zookeeper方案
主機IP 訊息埠 通訊埠 節點目錄/opt/下
192.168.0.107 2181 2888:3888 zookeeper-3.3.6
192.168.0.108 2181 2888:3888 zookeeper-3.3.6
192.168.0.109 2181 2888:3888 zookeeper-3.3.6
2、2 ActiveMQ方案
主機IP 叢集通訊埠 訊息埠 控制檯埠 節點目錄/opt/下
192.168.0.107 62621 51511 8161 activemq-cluster/node1/
192.168.0.107 62621 51512 8162 activemq-cluster/node1/
192.168.0.107 62621 51513 8163 activemq-cluster/node1/

ActiveMQ叢集我部署在一臺機器上而已,你感興趣的話看完這篇文章,不妨你把ActiveMQ搭建到每一臺機器上。

2、3 Zookeeper叢集搭建

將zookeeper-3.3.6.tar.gz上傳到/opt目錄下

下面三臺機器都是使用同一個操作,如有不同我會另作說明

解壓

shell>tar -zxvf zookeeper-3.3.6.tar.gz

進入conf目錄

shell>cd zookeeper-3.3.6/conf

將zoo_sample.cfg檔名改為zoo.cfg

shell>mv zoo_sample.cfg zoo.cfg

編輯zoo.cfg

shell>vim zoo.cfg

修改dataDir路徑

dataDir=/opt/zookeeper-3.3.6/data

加入內容

server.0=192.168.0.109:2888:3888
server.1=192.168.0.107:2888:3888
server.2=192.168.0.108:2888:3888

我也不懂為什麼上傳後的圖片變模糊了,估計是csdn.....

這裡幾個引數我說明一下:

tickTime: zookeeper中使用的基本時間單位, 毫秒值.
dataDir: 資料目錄. 可以是任意目錄.
dataLogDir: log目錄, 同樣可以是任意目錄. 如果沒有設定該引數, 將使用和dataDir相同的設定.(這裡我不配置)
clientPort: 監聽client連線的埠號.

還需修改最後一個檔案zookeeper叢集就算搭建完成了

進入data目錄

shell>cd zookeeper-3.3.6/data

編輯myid每臺機器只需要加入你在zoo.cfg中加入的server配置對應得數字即可。

shell>vim myid

192.168.0.109內容如下

0

192.168.0.108內容如下

1

192.168.0.107內容如下

2

好到這就算配置完成了啟動zookeeper試一下,是否成功:

啟動服務端:
這裡寫圖片描述

啟動客戶端:

這裡寫圖片描述

好zookeeper叢集的搭建到這就完成了。

2、4 ActiveMQ叢集搭建

將apache-activemq-5.11.1bin.tar.gz上傳到/opt目錄下

在192.168.0.107節點下,建立/opt/activemqcluster資料夾,解壓apache-activemq-5.11.1bin.tar.gz檔案,然後對解壓好的檔案改名,操作如下:

shell>mkdir /opt/activemq-cluster 
shell>cd opt/ 
shell>tar -zxvf apache-activemq-5.11.1-bin.tar.gz -C /opt/activemq-cluster/ 
shell>cd /opt/activemq-cluster/ 
shell>mv apache-activemq-5.11.1/ node1

如此操作,再次反覆解壓apache-activemq-5.11.1bin.tar.gz檔案到/opt/activemq-cluster/下,建立node2和node3資料夾,如下:

那我們現在已經解壓好了三個ActiveMQ節點也就是node1、node2、node3,下面 我們要做的事情就是更改每個節點不同的配置和埠(由於是在一臺機器上實現叢集)

1 修改控制檯埠(預設為8161)

在mq安裝路徑下的conf/jetty.xml進行修改即可。(三個節點都要修改,並且埠都不同)

shell>cd /opt/activemq-cluster/node1/conf/ 

shell>vim jetty.xml

這裡寫圖片描述

三個節點都需要修改為8161、8162、8163!!!

2 叢集配置檔案修改

我們在mq安裝路徑下的conf/activemq.xml進行修改其中的持久化介面卡,修改其中的bind、zkAddress、hostname、zkPath。 然後也需要修改mq的brokerName,並且每個節點名稱都必須相同。

命令:vim /opt/activemq-cluster/node1/conf/activemq.xml

第一處修改:brokerName=”activemq-cluster”(三個節點都需要修改)
這裡寫圖片描述

第二處修改:先註釋掉介面卡(persistenceAdapter標籤)中的kahadb,新增新的leveldb配置如下(三個節點都需要修改),內容如下:
Node1:

    <persistenceAdapter>
             <!--kahaDB directory="${activemq.data}/kahadb"/ -->
             <replicatedLevelDB
                    directory="${activemq.data}/leveldb"
                    replicas="3" 
                    bind="tcp://0.0.0.0:62621"
                    zkAddress="192.168.0.107:2181,192.168.0.108:2181,192.168.0.109:2181"
                    hostname="hfbin107"
                    zkPath="/activemq/leveldb-stores"/>
    </persistenceAdapter>

Node2:

        <persistenceAdapter>
                 <!--kahaDB directory="${activemq.data}/kahadb"/ -->
                 <replicatedLevelDB
                        directory="${activemq.data}/leveldb"
                        replicas="3" 
                        bind="tcp://0.0.0.0:62622"
                        zkAddress="192.168.0.107:2181,192.168.0.108:2181,192.168.0.109:2181"
                        hostname="hfbin107"
                        zkPath="/activemq/leveldb-stores"/>
        </persistenceAdapter>

Node3:

        <persistenceAdapter>
                 <!--kahaDB directory="${activemq.data}/kahadb"/ -->
                 <replicatedLevelDB 
                        directory="${activemq.data}/leveldb" 
                        replicas="3" 
                        bind="tcp://0.0.0.0:62623" 
                        zkAddress="192.168.0.107:2181,192.168.0.108:2181,192.168.0.109:2181" 
                        hostname="hfbin107" 
                        zkPath="/activemq/leveldb-stores"/>
        </persistenceAdapter>

node1圖片

第四處修改

修改通訊的埠,避免衝突,大概在120行:

shell>vim /opt/activemq-cluster/node1/conf/activemq.xml 

這裡寫圖片描述

修改這個檔案的通訊埠號,三個節點都需要修改(51511,51512,51513)

Ok,到此為止,我們的activemq叢集環境已經搭建完畢!

3 測試啟動activemq叢集

第一步:啟動zookeeper叢集,命令:zkServer.sh start

第二步:啟動mq叢集:順序啟動mq:

shell>/opt/activemq-cluster/node1/bin/activemq start(關閉stop) 
shell>/opt/activemq-cluster/node2/bin/activemq start(關閉stop) 
shell>/opt/activemq-cluster/node3/bin/activemq start(關閉stop) 

這裡寫圖片描述
第三步:檢視日誌資訊:

shell>tail -f /opt/activemq-cluster/node1/data/activemq.log 
shell>tail -f /opt/activemq-cluster/node2/data/activemq.log 
shell>tail -f /opt/activemq-cluster/node3/data/activemq.log 

如果不報錯,我們的叢集啟動成功,可以使用控制檯檢視!

這裡有個小案例你可以試一下,執行結果需要你自己去動手了我這裡截一兩張圖說明補了什麼。具體操作我可以說一下你可以按照我說操作來看看結果如何。

生產端程式碼:

public class Sender {

    public static void main(String[] args) {
        try {
            //第一步:建立ConnectionFactory工廠物件,需要填入使用者名稱、密碼、以及要連線的地址,均使用預設即可,預設埠為"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER, 
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
                    "failover:(tcp://192.168.0.107:51511,tcp://192.168.0.108:51512,tcp://192.168.0.109:51513)?Randomize=false");

            //第二步:通過ConnectionFactory工廠物件我們建立一個Connection連線,並且呼叫Connection的start方法開啟連線,Connection預設是關閉的。
            Connection connection = connectionFactory.createConnection();
            connection.start();

            //第三步:通過Connection物件建立Session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用是事務,引數配置2為簽收模式,一般我們設定自動簽收。
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //第四步:通過Session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題。在程式中可以使用多個Queue和Topic。
            Destination destination = session.createQueue("first");

            //第五步:我們需要通過Session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer。
            MessageProducer producer = session.createProducer(null);

            //第六步:我們可以使用MessageProducer的setDeliveryMode方法為其設定持久化特性和非持久化特性(DeliveryMode),我們稍後詳細介紹。
            //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //第七步:最後我們使用JMS規範的TextMessage形式建立資料(通過Session物件),並用MessageProducer的send方法傳送資料。同理客戶端使用receive方法進行接收資料。最後不要忘記關閉Connection連線。

            for(int i = 0 ; i < 500000 ; i ++){
                TextMessage msg = session.createTextMessage("我是訊息內容" + i);
                // 第一個引數目標地址
                // 第二個引數 具體的資料資訊
                // 第三個引數 傳送資料的模式
                // 第四個引數 優先順序
                // 第五個引數 訊息的過期時間
                producer.send(destination, msg, DeliveryMode.NON_PERSISTENT, 0 , 1000L);
                System.out.println("傳送訊息:" + msg.getText());
                Thread.sleep(1000);

            }

            if(connection != null){
                connection.close();
            }           
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

消費端:

public class Receiver {

    public static void main(String[] args)  {
        try {
            //第一步:建立ConnectionFactory工廠物件,需要填入使用者名稱、密碼、以及要連線的地址,均使用預設即可,預設埠為"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER, 
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
                    "failover:(tcp://192.168.0.107:51511,tcp://192.168.0.108:51512,tcp://192.168.0.109:51513)?Randomize=false");

            //第二步:通過ConnectionFactory工廠物件我們建立一個Connection連線,並且呼叫Connection的start方法開啟連線,Connection預設是關閉的。
            Connection connection = connectionFactory.createConnection();
            connection.start();

            //第三步:通過Connection物件建立Session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用是事務,引數配置2為簽收模式,一般我們設定自動簽收。
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //第四步:通過Session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題。在程式中可以使用多個Queue和Topic。
            Destination destination = session.createQueue("first");
            //第五步:通過Session建立MessageConsumer
            MessageConsumer consumer = session.createConsumer(destination);

            while(true){
                TextMessage msg = (TextMessage)consumer.receive();
                if(msg == null) break;
                System.out.println("收到的內容:" + msg.getText());
            }           
        } catch (Exception e) {
            e.printStackTrace();
        }




    }
}

對程式碼不瞭解的可以去看我以前的文章。

      這裡你先啟動消費端程式碼,讓消費端一直在等,然後再啟動生產端(生產端生產五十萬條數字),生產到上百條時候我模擬Node1(Master)掛掉瞭然後看看Node2(Slave)或Node3(Slave)有沒有一個節點成為Master,這個只是我說的之一更多模擬可以自己搞。

說明:如何知道Node1是Master,就看你建立ConnectionFactory工廠物件的連線地址第一個地址預設為Master,你要是想確認是不是這裡需要輔助的一些工具了。在這我就不做說明了,要是人想了解可以給我留言。