1. 程式人生 > >zookeeper與卡夫卡叢集搭建

zookeeper與卡夫卡叢集搭建

首先這片部落格沒有任何理論性的東西,只是詳細說明kafka與zookeeper叢集的搭建過程,需要三臺linux伺服器。

  • java環境變數設定
  • zookeeper叢集搭建
  • kafka叢集搭建

java環境變數設定

在每臺伺服器上都有設定java環境變數

這裡使用java原始碼安裝的方式:

下載原始碼包解壓,放入到/usr/local/資料夾下,修改名目錄名字為jdk!接下就是把java的命令引數加入到linux的環境變數中。

[[email protected] jdk]# cat /etc/profile.d/java.sh 
JAVA_HOME=/usr/local/jdk
JAVA_BIN
=/usr/local/jdk/bin JRE_HOME=/usr/local/jdk/jre PATH=$PATH:/usr/local/jdk/bin:/usr/local/jdk/jre/bin CLASSPATH=/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib/charsets.jar [[email protected] jdk]#

#然後執行source命令,載入新加入的java.sh指令碼!
[[email protected] bin]# source /etc/profile.d/java.sh

#驗證java環境變數是否設定成功
[[email protected]
bin]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
[[email protected] bin]# echo $JAVA_HOME #檢視JAVA_HOME變數是否是上面設定的路徑
/usr/local/jdk/jdk
[[email protected] bin]#

若是以上均反饋正常,則說明java環境已經設定完畢!

zookeeper叢集搭建

zookeeper叢集被稱為群組。zookeeper使用的是一致性協議,所以建議每個群組裡應該包含奇數個節點,因為只有當群組中大多數節點處於可用狀態,zookeeper才能處理外部請求。也就是說,如果一個包含3個節點的群組,那麼它允許一個節點失效。如果包含5個節點,那麼它允許2個節點失效。【不建議zookeeper群組超過7個節點,因為zookeeper使用了一致性協議,節點過多會降低群組的效能】

群組中的每個伺服器需要在資料目錄中建立一個myid檔案,用於指明自己的ID。

這裡zookeeper採用原始碼的方式安裝:

[[email protected] src]# tar zxvf zookeeper-3.4.6.tar.gz -C ../         #解壓
[[email protected] src]# cd ../
[[email protected] local]# mv zookeeper-3.4.6 zookeeper                 #修改名字
[[email protected] local]# cd zookeeper
[[email protected] zookeeper]# ls
bin        CHANGES.txt  contrib     docs             ivy.xml  LICENSE.txt  README_packaging.txt  recipes  zookeeper-3.4.6.jar      zookeeper-3.4.6.jar.md5   zookeeper.out
build.xml  conf         dist-maven  ivysettings.xml  lib      NOTICE.txt   README.txt            src      zookeeper-3.4.6.jar.asc  zookeeper-3.4.6.jar.sha1
[[email protected] zookeeper]# mkdir -p /data/zookeeper/data                 #建立資料目錄
[[email protected] zookeeper]# mkdir -p /data/zookeeper/logs                 #建立日誌檔案目錄
[[email protected] zookeeper]# cd conf [[email protected] conf]#
ls configuration.xsl log4j.properties zoo_sample.cfg [[email protected] conf]# mv zoo_sample.cfg zoo.cfg #修改配置檔案的名字,建議把原來的配置檔案做備份

zookeeper的配置檔案如下:

# The number of milliseconds of each tick
tickTime=2000 #單位為毫秒
# The number of ticks that the initial # synchronization phase can take initLimit
=10
#表示用於在從節點與主節點之間建立初始化連線的上限。
# The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5
#表示允許從節點與主節點處於不同步轉檯的時間上限。
#initLimit與syncLimit這兩個值都是tickTime的倍數,表示的時間是tickTime乘以這兩個數值的值。 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data/zookeeper/data #指定資料目錄檔案 dataLogDir=/data/zookeeper/logs #指定日誌目錄檔案 # the port at which the clients will connect clientPort=2181 #zookeeper監聽的埠 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1

#zookeeper的群組設定
server.1=10.0.102.179:2888:3888 server.2=10.0.102.204:2888:3888 server.3=10.0.102.214:2888:3888

#群組設定的格式為: server.X=hostname:peerPort:leader:Port
X:伺服器的ID,必須是一個整數,不過不一定要從0開始,也不要求是連續的。
peerPort:用於節點間通訊的TCP埠。
leaderPort:用於leader選舉的TCP埠。

客戶端只需要通過clientport就能連線到群組,而群組節點間的通訊則需要同時用到這三個埠。

 

【上面大操作,只是修改了配置檔案,建立了資料目錄和日誌檔案目錄,還有就是解壓了原始碼包!

下面要做的就是把上面操作在另外兩臺機器上做一遍,也可以把上面的檔案直接通過scp命令複製到其餘兩臺機器對應的位置上。】

同MySQL叢集一樣,叢集每臺伺服器都需要一個唯一的標識,zookeeper也一樣,需要一個myid檔案,來標識每一臺伺服器!

[[email protected] data]# pwd
/data/zookeeper/data
[[email protected] data]# cat myid 
1

#在每臺叢集的資料檔案目錄下面建立myid檔案,每個檔案中寫入一個整數,這個整數用來指明當前伺服器的ID。
#這個ID應該是需要和上面配置檔案中的那個server.X中的X一致的。【遇到過不一致的情況,但是啟動失敗,修改為一致,就啟動成功】

另外兩臺的myid檔案如下:

[[email protected] data]# pwd
/data/zookeeper/data
[[email protected] data]# cat myid 
2

[[email protected] data]# pwd
/data/zookeeper/data
[[email protected] data]# cat myid
3

然後啟動zookeeper叢集:

#三臺伺服器都這樣啟動,會在當前目錄執行路徑下面生成nohup.out檔案,若是報錯的話,可以檢視檔案內容!
[[email protected] bin]# pwd
/usr/local/zookeeper/bin
[[email protected] bin]# ./zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED

#需要注意的是,啟動完成之後一定要檢視是否有程序,有些時候不會報錯,但是沒有啟動成功!
[[email protected] bin]# netstat -alntp | grep 2181 #檢視監聽埠是否存在
tcp        0      0 :::2181                     :::*                        LISTEN      27962/java

三臺伺服器全部啟動完成之後,進行如下驗證:

[[email protected] bin]# ./zkCli.sh --server10.0.102.204
Connecting to localhost:2181
2018-12-21 10:54:38,286 [myid:] - INFO  [main:[email protected]100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2018-12-21 10:54:38,289 [myid:] - INFO  [main:[email protected]100] - Client environment:host.name=test3
2018-12-21 10:54:38,289 [myid:] - INFO  [main:[email protected]100] - Client environment:java.version=1.7.0_79
2018-12-21 10:54:38,291 [myid:] - INFO  [main:[email protected]100] - Client environment:java.vendor=Oracle Corporation
2018-12-21 10:54:38,291 [myid:] - INFO  [main:[email protected]100] - Client environment:java.home=/usr/local/jdk/jdk1.7.0_79/jre
2018-12-21 10:54:38,291 [myid:] - INFO  [main:[email protected]] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.6.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:

2018-12-21 10:54:38,291 [myid:] - INFO  [main:[email protected]100] - Client environment:java.library.path=/usr/local/mysql/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2018-12-21 10:54:38,291 [myid:] - INFO  [main:[email protected]100] - Client environment:java.io.tmpdir=/tmp
2018-12-21 10:54:38,292 [myid:] - INFO  [main:[email protected]100] - Client environment:java.compiler=<NA>
2018-12-21 10:54:38,292 [myid:] - INFO  [main:[email protected]100] - Client environment:os.name=Linux
2018-12-21 10:54:38,292 [myid:] - INFO  [main:[email protected]100] - Client environment:os.arch=amd64
2018-12-21 10:54:38,292 [myid:] - INFO  [main:[email protected]100] - Client environment:os.version=2.6.32-504.el6.x86_64
2018-12-21 10:54:38,292 [myid:] - INFO  [main:[email protected]100] - Client environment:user.name=root
2018-12-21 10:54:38,292 [myid:] - INFO  [main:Environ[email protected]100] - Client environment:user.home=/root
2018-12-21 10:54:38,292 [myid:] - INFO  [main:[email protected]100] - Client environment:user.dir=/usr/local/zookeeper/bin
2018-12-21 10:54:38,293 [myid:] - INFO  [main:[email protected]438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=[email protected]
Welcome to ZooKeeper!
2018-12-21 10:54:38,315 [myid:] - INFO  [main-SendThread(localhost:2181):[email protected]975] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2018-12-21 10:54:38,320 [myid:] - INFO  [main-SendThread(localhost:2181):[email protected]852] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-12-21 10:54:38,330 [myid:] - INFO  [main-SendThread(localhost:2181):[email protected]1235] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x367ce7db1fa0003, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]


#若有如上顯示,則zookeeper叢集搭建成功

搭建卡夫卡叢集

仍然採用原始碼的方式安裝。安裝過程和zookeeper差不多,解壓,修改配置檔案,建立對應訊息目錄。

[[email protected] src]# tar -zxvf kafka_2.10-0.10.0.0.tgz -C ../         #解壓
[[email protected] src]# cd ..
[[email protected] local]# mv kafka_2.10-0.10.0.0 kafka                   #修改名字
[[email protected] local]# cd kafka/
[[email protected] kafka]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[[email protected] kafka]# cd config/                                      #
[[email protected] config]# ls                                             #kafka的配置檔案有很多,但是目前我們只需要關注 server.properties
connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties     zookeeper.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties
connect-distributed.properties     connect-log4j.properties        log4j.properties               tools-log4j.properties

配置檔案如下:把其中的註釋刪掉了!

broker.id=3                            #在每個kafka伺服器中唯一的,是一個整數
port=9092
host.name=10.0.102.179
auto.create.topics.enable = false      
delete.topic.enable = true
num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


log.dirs=/data/kafka/logs                 

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=10.0.102.179:2181,10.0.102.204:2181,10.0.102.214:2181
zookeeper.connection.timeout.ms=6000

配置檔案的引數的說明:

  • broker.id: 每一個broker都需要一個識別符號,使用broker.id來表示。預設是0,可以被設定為任意整數,在kafka叢集中必須是唯一的。
  • port:預設埠是9092,配置kafka監聽的埠。
  • zookeeper.connect:用於儲存broker元資料的zookeeper地址是通過zookeeper.connect來指定的。新式就是上面那樣,主機和埠號,多個地址用逗號隔開。上面的三個是一個zookeeper叢集。若是當前連線的zookeeper宕機,broker可以連線到zookeeper群組中的其他伺服器上。
  • log.dirs: kafka把所有訊息都儲存在磁碟上,存放這些日子片段的目錄是通過log.dirs指定的。它是一組用逗號分隔的本地檔案系統路徑。如果指定了多個路徑,那麼broker會根據“最少使用”原則,把同一個分割槽的日誌片段儲存到同一個路徑下。需要注意,borker會往擁有最少數目分割槽的路徑新增分割槽,而不是往擁有最小磁碟空間的路徑新增分割槽。
  • num.recovery.threads.per.data.dir,對於如下3種情況,kafka會使用多少個執行緒來處理日誌片段。
    •   伺服器正常啟動,用於開啟每個分割槽的日誌片段。
    •        伺服器崩潰後重啟,用於檢查和截短每個分割槽的日誌片段。
    •        伺服器正常關閉,用於關閉日誌片段。

          預設情況下,每個日誌目錄只使用一個執行緒。因為這些執行緒只是在伺服器啟動和關閉時會用到,所以完全可以設定大量的執行緒來達到並行操作的目的。特別是對於包含大量分割槽的伺服器來說,一旦發生崩潰,在進行恢復時使用並行操作可能會省下很多時間。所配置的數字對應的是log.dirs指定的單個日誌目錄。譬如,這個值被設定為8,並且log.dirx指定了3個路徑,那麼久需要24個執行緒。

  • auto.create.topics.enable 預設情況下,kafka會在以下幾種情況下建立主題。
    •   當一個生產者開始往主題寫入訊息時。
    •        當一個消費者開始從主題讀取訊息時。
    •        當任意一個客戶端向主題傳送元資料請求是。

    這個引數有點迷糊,儘量設定為false吧!

 

上面指定了kafa的日誌目錄,需要建立訊息目錄:

[[email protected] ~]#  mkdir -p /data/kafka/logs

和zookeeper一樣,把上面解壓後的檔案和建立的日誌目錄,分別拷貝到其餘兩臺伺服器上,注意修改其餘兩臺伺服器的broker.id。

然後啟動kafka:

[[email protected] bin]# ls
connect-distributed.sh     kafka-consumer-groups.sh             kafka-reassign-partitions.sh   kafka-simple-consumer-shell.sh   zookeeper-server-start.sh
connect-standalone.sh      kafka-consumer-offset-checker.sh     kafka-replay-log-producer.sh   kafka-topics.sh                  zookeeper-server-stop.sh
kafka-acls.sh              kafka-consumer-perf-test.sh          kafka-replica-verification.sh  kafka-verifiable-consumer.sh     zookeeper-shell.sh
kafka-configs.sh           kafka-mirror-maker.sh                kafka-run-class.sh             kafka-verifiable-producer.sh
kafka-console-consumer.sh  kafka-preferred-replica-election.sh  kafka-server-start.sh          windows
kafka-console-producer.sh  kafka-producer-perf-test.sh          kafka-server-stop.sh           zookeeper-security-migration.sh

#bin目錄下面有很多指令碼,kafka自帶的有zookeeper啟動,但是建議搭建獨立的zookeeper叢集。

[[email protected] bin]# ./kafka-server-start.sh /usr/local/kafka/config/server.properties 1>/dev/null 2>&1 &
[1] 8757
[[email protected] bin]#

#kafka預設標準輸出到螢幕,上面的命令把kafka掛起在後臺執行。

#啟動之後檢視是否監聽了9092埠。
[[email protected] bin]# netstat -lntp |grep 9092
tcp        0      0 ::ffff:10.0.102.204:9092    :::*                        LISTEN      8757/java           
[[email protected] bin]#

三臺伺服器全部啟動完畢,可以進行如下測試:

[[email protected] bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topci science       #建立一個主題
topci is not a recognized option
[[email protected] bin]#

#建立一個生產者,傳送訊息
[[email protected] bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topic science
test message
first
#創意一個消費者物件,接收生產者傳送的訊息
[[email protected] bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.179:2181 --topic science --from-beginning
test message

first

#可以看到卡夫卡是一個先進先出的訊息佇列

若是以上的佇列訊息驗證成功,則說明叢集搭建成功、

[[email protected] bin]# ./kafka-topics.sh --zookeeper 10.0.102.179:2181 --describe --topic science         #檢視主題的資訊
Topic:science    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: science    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
[[email protected] bin]# 

 

一個異常說明:在測試的時候,剛開始總報如下錯誤:

[[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic lianxi
test1
[2018-12-20 17:33:27,112] ERROR Error when sending message to topic lianxi with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

這個問題搞了好久,最好把java的版本從1.8降到了1.7之後,然後就成功了!

搭建成功的各個元件的版本資訊如下:

[[email protected] ~]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)

#kafka沒有類似的version命令,需要檢視kafka的使用包資訊
[[email protected] libs]# pwd
/usr/local/kafka/libs
[[email protected] libs]# ls kafka_2.10-0.10.0.0.jar
kafka_2.10-0.10.0.0.jar

#2.10是scala的版本,後面的0.10.0.0是kafka的版本

#在zookeeper的安裝目錄下面有對應版本的一些包
[[email protected] zookeeper]# ls
bin        CHANGES.txt  contrib     docs             ivy.xml  LICENSE.txt  README_packaging.txt  recipes  zookeeper-3.4.6.jar      zookeeper-3.4.6.jar.md5
build.xml  conf         dist-maven  ivysettings.xml  lib      NOTICE.txt   README.txt            src      zookeeper-3.4.6.jar.asc  zookeeper-3.4.6.jar.sha1
[[email protected] zookeeper]#
#zookeeper的版本是3.4.6