kafka系列二:多節點分散式叢集搭建
上一篇分享了單節點偽分散式叢集搭建方法,本篇來分享一下多節點分散式叢集搭建方法。多節點分散式叢集結構如下圖所示:
為了方便查閱,本篇將和上一篇一樣從零開始一步一步進行叢集搭建。
一、安裝Jdk
具體安裝步驟可參考 linux安裝jdk。
二、安裝與配置zookeeper
下載地址:https://www-us.apache.org/dist/zookeeper/stable/
下載二進位制壓縮包 zookeeper-3.4.14.tar.gz,然後上傳到linux伺服器指定目錄下,本次上傳目錄為 /software ,然後執行如下命令安裝:
cd /software
tar -zxvf zookeeper-3.4.14.tar.gz
mv zookeeper-3.4.14 /usr/local/zookeeper
cd /usr/local/zookeeper/conf
mv zoo_sample.cfg zoo1.cfg
編輯 zoo1.cfg ,配置相關引數如下:
tickTime=2000 initLimit=5 syncLimit=2 dataDir=/usr/local/zookeeper/data/zookeeper1 clientPort=2181 server.1=192.168.184.128:2888:3888 server.2=192.168.184.128:2889:3889 server.3=192.168.184.128:2890:3890
其中:
tickTime :Zookeeper最小的時間單位,用於丈量心跳和超時時間,一般設定預設值2秒;
initLimit :指定follower節點初始時連線leader節點的最大tick此處,設定為5,表示follower必須在 5xtickTime 即10秒內連線上leader,否則視為超時;
syncLimit :設定follower節點與leader節點進行同步的最大時間,設定為2,表示最大時間為 2xtickTime 即4秒時間;
dataDir :Zookeeper會在記憶體中儲存系統快照,並定期寫入該路徑指定的資料夾中,生產環境需要特別注意該資料夾的磁碟佔用情況;
clientPort :Zookeeper監聽客戶端連線的埠號,預設為2181,同一伺服器上不同例項之間應該有所區別;
server.X=host:port1:port2 :此處X的取值範圍在1~255之間,必須是全域性唯一的且和myid檔案中的數字對應(myid檔案後面說明),host是各個節點的主機名,port1通常是2888,用於使follower節點連線leader節點,port2通常是3888,用於leader選舉,zookeeper在不同伺服器上的時候,不同zookeeper伺服器的埠號可以重複,在同一臺伺服器上的時候需要有所區別。
1.配置zoo.cfg檔案
單節點安裝zookeeper的時候,僅有一份zoo.cfg檔案,多節點安裝的時候,每個zookeeper伺服器就應該有一個zoo.cfg配置檔案。如果在一臺伺服器安裝zookeeper多例項叢集,則需要在conf目錄下分別配置每個例項的zoo.cfg,同時建立每個zookeeper例項自己的資料儲存目錄。本次在一臺伺服器上配置多個zookeeper例項,執行如下命令建立資料儲存目錄並複製配置檔案:
mkdir -p /usr/local/zookeeper/data/zookeeper1 mkdir -p /usr/local/zookeeper/data/zookeeper2 mkdir -p /usr/local/zookeeper/data/zookeeper3 cd /usr/local/zookeeper/conf/ cp zoo1.cfg zoo2.cfg cp zoo1.cfg zoo3.cfg
複製後分別修改 zoo2.cfg , zoo3.cfg 中的配置,修改後的配置如下:
zoo1.cfg的配置如下:
zoo2.cfg的配置如下:
zoo3.cfg中的配置如下:
2.myid檔案建立與配置
前面提到zoo.cfg檔案中的server.X中的X應該與myid中的數字相對應。除此之外,myid檔案必須存放在每個zookeeper例項的data目錄下,對應本次安裝應該位於 /usr/local/zookeeper/data/zookeeper1,2,3 目錄下,執行如下命令進行配置:
echo '1' > /usr/local/zookeeper/data/zookeeper1/myid
echo '2' > /usr/local/zookeeper/data/zookeeper2/myid
echo '3' > /usr/local/zookeeper/data/zookeeper3/myid
3.啟動zookeeper伺服器
使用如下命令啟動zookeeper叢集:
cd /usr/local/zookeeper/bin/
./zkServer.sh start ../conf/zoo1.cfg
./zkServer.sh start ../conf/zoo2.cfg
./zkServer.sh start ../conf/zoo3.cfg
啟動後,使用如下命令檢視叢集狀態:
cd /usr/local/zookeeper/bin/
./zkServer.sh status ../conf/zoo1.cfg ./zkServer.sh status ../conf/zoo2.cfg ./zkServer.sh status ../conf/zoo3.cfg
回顯資訊如下:
可以看到有兩個follower節點,一個leader節點。
三、安裝與配置kafka叢集
下載地址:http://kafka.apache.org/downloads.html
1.資料目錄和配置檔案建立
目前最新版本是2.2.0,本次下載2.1.1版本的安裝包,然後上傳壓縮包到伺服器指定目錄,本次上傳目錄為 /software ,然後執行以下命令進行安裝:
tar -zxvf kafka_2.12-2.1.1.tgz
mv kafka_2.12-2.1.1 /usr/local/kafka
mkdir -p /usr/local/kafka/logs/kafka1
mkdir -p /usr/local/kafka/logs/kafka2
mkdir -p /usr/local/kafka/logs/kafka3
cd /usr/local/kafka/config/
mv server.properties server1.properties
cp server1.properties server2.properties
cp server1.properties server3.properties
通過執行上面的命令,我們在 /usr/local/kafka/logs 資料夾中建立了 kafka1,kafka2,kafka3 三個資料夾用於存放三個kafka例項的資料,同時在 /usr/local/kafka/config/ 資料夾下複製了三份 server.properties 檔案分別用於配置各個kafka例項。
2.配置屬性檔案
接下來分別配置三個 server.properties 檔案,主要配置引數如下:
broker.id=1 :設定kafka broker的id,本次分別為1,2,3;
delete.topic.enable=true :開啟刪除topic的開關;
listeners=PLAINTEXT://192.168.184.128:9092 :設定kafka的監聽地址和埠號,本次分別設定為9092,9093,9094;
log.dirs=/usr/local/kafka/logs/kafka1 :設定kafka日誌資料儲存路徑;
zookeeper.connect=192.168.184.128:2181,192.168.184.128:2182,192.168.184.128:2183 :設定kafka連線的zookeeper訪問地址,叢集環境需要配置所有zookeeper的訪問地址;
unclean.leader.election.enable=false :為true則代表允許選用非isr列表的副本作為leader,那麼此時就意味著資料可能丟失,為false的話,則表示不允許,直接丟擲NoReplicaOnlineException異常,造成leader副本選舉失敗。
zookeeper.connection.timeout.ms=6000 :設定連線zookeeper伺服器超時時間為6秒。
配置完成後,各個配置檔案中配置如下:
server1.properties配置:
server2.properties配置:
server3.properties配置:
3.啟動kafka
通過如下命令啟動kafka叢集:
cd /usr/local/kafka/bin/
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
./kafka-server-start.sh -daemon ../config/server3.properties
使用 java的命令jps來檢視kafka程序:jps |grep -i kafka
說明kafak啟動正常,至此kafka叢集搭建完成。本次使用一臺伺服器作為演示,如果需要在多個伺服器上配置叢集,配置方法和以上類似,只是不需要像上面那樣配置多個數據目錄和配置檔案,每臺伺服器的配置保持相同,並且注意在防火牆配置埠號即可。
四、測試
1.topic建立與刪除
首先建立一個測試topic,名為testTopic,為了充分利用3個例項(伺服器節點),建立3個分割槽,每個分割槽都分配3個副本,命令如下:
cd /usr/local/kafka/bin/
./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --create --topic testTopic --partitions 3 --replication-factor 3
回顯 Created topic "testTopic". 則表明testTopic建立成功。執行如下命令進行驗證並檢視testTopic的資訊:
./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --list testTopic
./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --describe --topic testTopic
以上幾條命令回顯資訊如下:
接下來測試topic刪除,使用如下命令進行刪除:
./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --delete --topic testTopic
執行該條命令後,回顯資訊如下:
可以看到,testTopic已經被標記為刪除,同時第二行提示表明當配置了 delete.topic.enable 屬性為 true 的時候topic才會刪除,否則將不會被刪除,本次安裝的時候該屬性設定的值為 true 。
2.測試訊息傳送與消費
首先使用第一步topic建立命令,先建立testTopic這個topic,然後進行訊息傳送與消費測試。
控制檯測試訊息傳送與消費需要使用kafka的安裝目錄 /usr/local/kafka/bin 下的 kafka-console-producer.sh 來發送訊息,使用 kafka-console-consumer.sh 來消費訊息。因此本次開啟兩個控制檯,一個用於執行 kafka-console-producer.sh來發送訊息,另一個用於執行 kafka-console-consumer.sh 來消費訊息。
訊息傳送端命令:
cd /usr/local/kafka/bin
./kafka-console-producer.sh --broker-list 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic
訊息接收端命令:
cd /usr/local/kafka/bin
./kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic --from-beginning
當傳送端和接收端都登入後,在傳送端輸入需要傳送的訊息並回車,在接收端可以看到剛才傳送的訊息:
傳送端:
接收端:
以上就是簡單地生產訊息與消費訊息的測試,在測試消費訊息的時候時候,命令裡邊加了個引數 --from-beginning 表示接收該topic從建立開始的所有訊息。
3.生產者吞吐量測試
對於任何一個訊息引擎而言,吞吐量是一個至關重要的效能指標。對於Kafka而言,它的吞吐量指每秒能夠處理的訊息數或者位元組數。kafka為了提高吞吐量,採用追加寫入方式將訊息寫入作業系統的頁快取,讀取的時候從頁快取讀取,因此它不直接參與物理I/O操作,同時使用以sendfile為代表的零拷貝技術進行資料傳輸提高效率。
kafka提供了 kafka-producer-perf-test.sh 指令碼用於測試生產者吞吐量,使用如下命令啟動測試:
cd /usr/local/kafka/bin
./kafka-producer-perf-test.sh --topic testTopic --num-records 50000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 acks=-1
以上回顯資訊表明這臺伺服器上每個producer每秒能傳送6518個訊息,平均吞吐量是1.24MB/s,平均延遲2.035秒,最大延遲3.205秒,平均有50%的訊息傳送需要2.257秒,95%的訊息傳送需要3.076秒,99%的訊息傳送需要3.171秒,99.9%的訊息傳送需要3.205秒。
4.消費者吞吐量測試
與生產者吞吐量測試類似,kafka提供了 kafka-consumer-perf-test.sh 指令碼用於消費者吞吐量測試,可以執行以下命令進行測試:
cd /usr/local/kafka/bin
./kafka-consumer-perf-test.sh --broker-list 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --messages 50000 --topic testTopic
以上是測試50萬條訊息的consumer吞吐量,結果表明該consumer在1秒總共消費了9.5366MB訊息。
以上就是kafka叢集的搭建以及測試,如有錯誤之處,煩請指正。
參考資料:《Apache kafka實戰》
&n