zookeeper與卡夫卡叢集搭建
首先這片部落格沒有任何理論性的東西,只是詳細說明kafka與zookeeper叢集的搭建過程,需要三臺linux伺服器。
- java環境變數設定
- zookeeper叢集搭建
- kafka叢集搭建
java環境變數設定
在每臺伺服器上都有設定java環境變數
這裡使用java原始碼安裝的方式:
下載原始碼包解壓,放入到/usr/local/資料夾下,修改名目錄名字為jdk!接下就是把java的命令引數加入到linux的環境變數中。
[[email protected] jdk]# cat /etc/profile.d/java.sh JAVA_HOME=/usr/local/jdk JAVA_BIN=/usr/local/jdk/bin JRE_HOME=/usr/local/jdk/jre PATH=$PATH:/usr/local/jdk/bin:/usr/local/jdk/jre/bin CLASSPATH=/usr/local/jdk/jre/lib:/usr/local/jdk/lib:/usr/local/jdk/jre/lib/charsets.jar [[email protected] jdk]#
#然後執行source命令,載入新加入的java.sh指令碼!
[[email protected] bin]# source /etc/profile.d/java.sh
#驗證java環境變數是否設定成功
[[email protected]bin]# java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
[[email protected] bin]# echo $JAVA_HOME #檢視JAVA_HOME變數是否是上面設定的路徑
/usr/local/jdk/jdk
[[email protected] bin]#
若是以上均反饋正常,則說明java環境已經設定完畢!
zookeeper叢集搭建
zookeeper叢集被稱為群組。zookeeper使用的是一致性協議,所以建議每個群組裡應該包含奇數個節點,因為只有當群組中大多數節點處於可用狀態,zookeeper才能處理外部請求。也就是說,如果一個包含3個節點的群組,那麼它允許一個節點失效。如果包含5個節點,那麼它允許2個節點失效。【不建議zookeeper群組超過7個節點,因為zookeeper使用了一致性協議,節點過多會降低群組的效能】
群組中的每個伺服器需要在資料目錄中建立一個myid檔案,用於指明自己的ID。
這裡zookeeper採用原始碼的方式安裝:
[[email protected] src]# tar zxvf zookeeper-3.4.6.tar.gz -C ../ #解壓 [[email protected] src]# cd ../ [[email protected] local]# mv zookeeper-3.4.6 zookeeper #修改名字 [[email protected] local]# cd zookeeper [[email protected] zookeeper]# ls bin CHANGES.txt contrib docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.6.jar zookeeper-3.4.6.jar.md5 zookeeper.out build.xml conf dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.6.jar.asc zookeeper-3.4.6.jar.sha1 [[email protected] zookeeper]# mkdir -p /data/zookeeper/data #建立資料目錄 [[email protected] zookeeper]# mkdir -p /data/zookeeper/logs #建立日誌檔案目錄
[[email protected] zookeeper]# cd conf [[email protected] conf]# ls configuration.xsl log4j.properties zoo_sample.cfg [[email protected] conf]# mv zoo_sample.cfg zoo.cfg #修改配置檔案的名字,建議把原來的配置檔案做備份
zookeeper的配置檔案如下:
# The number of milliseconds of each tick
tickTime=2000 #單位為毫秒
# The number of ticks that the initial # synchronization phase can take initLimit=10
#表示用於在從節點與主節點之間建立初始化連線的上限。
# The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5
#表示允許從節點與主節點處於不同步轉檯的時間上限。
#initLimit與syncLimit這兩個值都是tickTime的倍數,表示的時間是tickTime乘以這兩個數值的值。 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/data/zookeeper/data #指定資料目錄檔案 dataLogDir=/data/zookeeper/logs #指定日誌目錄檔案 # the port at which the clients will connect clientPort=2181 #zookeeper監聽的埠 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1
#zookeeper的群組設定 server.1=10.0.102.179:2888:3888 server.2=10.0.102.204:2888:3888 server.3=10.0.102.214:2888:3888
#群組設定的格式為: server.X=hostname:peerPort:leader:Port
X:伺服器的ID,必須是一個整數,不過不一定要從0開始,也不要求是連續的。
peerPort:用於節點間通訊的TCP埠。
leaderPort:用於leader選舉的TCP埠。
客戶端只需要通過clientport就能連線到群組,而群組節點間的通訊則需要同時用到這三個埠。
【上面大操作,只是修改了配置檔案,建立了資料目錄和日誌檔案目錄,還有就是解壓了原始碼包!
下面要做的就是把上面操作在另外兩臺機器上做一遍,也可以把上面的檔案直接通過scp命令複製到其餘兩臺機器對應的位置上。】
同MySQL叢集一樣,叢集每臺伺服器都需要一個唯一的標識,zookeeper也一樣,需要一個myid檔案,來標識每一臺伺服器!
[[email protected] data]# pwd /data/zookeeper/data [[email protected] data]# cat myid 1
#在每臺叢集的資料檔案目錄下面建立myid檔案,每個檔案中寫入一個整數,這個整數用來指明當前伺服器的ID。
#這個ID應該是需要和上面配置檔案中的那個server.X中的X一致的。【遇到過不一致的情況,但是啟動失敗,修改為一致,就啟動成功】
另外兩臺的myid檔案如下:
[[email protected] data]# pwd /data/zookeeper/data [[email protected] data]# cat myid 2
[[email protected] data]# pwd
/data/zookeeper/data
[[email protected] data]# cat myid
3
然後啟動zookeeper叢集:
#三臺伺服器都這樣啟動,會在當前目錄執行路徑下面生成nohup.out檔案,若是報錯的話,可以檢視檔案內容!
[[email protected] bin]# pwd
/usr/local/zookeeper/bin
[[email protected] bin]# ./zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
#需要注意的是,啟動完成之後一定要檢視是否有程序,有些時候不會報錯,但是沒有啟動成功!
[[email protected] bin]# netstat -alntp | grep 2181 #檢視監聽埠是否存在
tcp 0 0 :::2181 :::* LISTEN 27962/java
三臺伺服器全部啟動完成之後,進行如下驗證:
[[email protected] bin]# ./zkCli.sh --server10.0.102.204 Connecting to localhost:2181 2018-12-21 10:54:38,286 [myid:] - INFO [main:[email protected]100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 2018-12-21 10:54:38,289 [myid:] - INFO [main:[email protected]100] - Client environment:host.name=test3 2018-12-21 10:54:38,289 [myid:] - INFO [main:[email protected]100] - Client environment:java.version=1.7.0_79 2018-12-21 10:54:38,291 [myid:] - INFO [main:[email protected]100] - Client environment:java.vendor=Oracle Corporation 2018-12-21 10:54:38,291 [myid:] - INFO [main:[email protected]100] - Client environment:java.home=/usr/local/jdk/jdk1.7.0_79/jre 2018-12-21 10:54:38,291 [myid:] - INFO [main:[email protected]] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.7.0.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.6.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf: 2018-12-21 10:54:38,291 [myid:] - INFO [main:[email protected]100] - Client environment:java.library.path=/usr/local/mysql/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 2018-12-21 10:54:38,291 [myid:] - INFO [main:[email protected]100] - Client environment:java.io.tmpdir=/tmp 2018-12-21 10:54:38,292 [myid:] - INFO [main:[email protected]100] - Client environment:java.compiler=<NA> 2018-12-21 10:54:38,292 [myid:] - INFO [main:[email protected]100] - Client environment:os.name=Linux 2018-12-21 10:54:38,292 [myid:] - INFO [main:[email protected]100] - Client environment:os.arch=amd64 2018-12-21 10:54:38,292 [myid:] - INFO [main:[email protected]100] - Client environment:os.version=2.6.32-504.el6.x86_64 2018-12-21 10:54:38,292 [myid:] - INFO [main:[email protected]100] - Client environment:user.name=root 2018-12-21 10:54:38,292 [myid:] - INFO [main:Environ[email protected]100] - Client environment:user.home=/root 2018-12-21 10:54:38,292 [myid:] - INFO [main:[email protected]100] - Client environment:user.dir=/usr/local/zookeeper/bin 2018-12-21 10:54:38,293 [myid:] - INFO [main:[email protected]438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=[email protected] Welcome to ZooKeeper! 2018-12-21 10:54:38,315 [myid:] - INFO [main-SendThread(localhost:2181):[email protected]975] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) JLine support is enabled 2018-12-21 10:54:38,320 [myid:] - INFO [main-SendThread(localhost:2181):[email protected]852] - Socket connection established to localhost/127.0.0.1:2181, initiating session 2018-12-21 10:54:38,330 [myid:] - INFO [main-SendThread(localhost:2181):[email protected]1235] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x367ce7db1fa0003, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTED) 0]
#若有如上顯示,則zookeeper叢集搭建成功
搭建卡夫卡叢集
仍然採用原始碼的方式安裝。安裝過程和zookeeper差不多,解壓,修改配置檔案,建立對應訊息目錄。
[[email protected] src]# tar -zxvf kafka_2.10-0.10.0.0.tgz -C ../ #解壓 [[email protected] src]# cd .. [[email protected] local]# mv kafka_2.10-0.10.0.0 kafka #修改名字 [[email protected] local]# cd kafka/ [[email protected] kafka]# ls bin config libs LICENSE logs NOTICE site-docs [[email protected] kafka]# cd config/ # [[email protected] config]# ls #kafka的配置檔案有很多,但是目前我們只需要關注 server.properties connect-console-sink.properties connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties connect-console-source.properties connect-file-source.properties consumer.properties server.properties connect-distributed.properties connect-log4j.properties log4j.properties tools-log4j.properties
配置檔案如下:把其中的註釋刪掉了!
broker.id=3 #在每個kafka伺服器中唯一的,是一個整數 port=9092 host.name=10.0.102.179 auto.create.topics.enable = false delete.topic.enable = true num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=10.0.102.179:2181,10.0.102.204:2181,10.0.102.214:2181 zookeeper.connection.timeout.ms=6000
配置檔案的引數的說明:
- broker.id: 每一個broker都需要一個識別符號,使用broker.id來表示。預設是0,可以被設定為任意整數,在kafka叢集中必須是唯一的。
- port:預設埠是9092,配置kafka監聽的埠。
- zookeeper.connect:用於儲存broker元資料的zookeeper地址是通過zookeeper.connect來指定的。新式就是上面那樣,主機和埠號,多個地址用逗號隔開。上面的三個是一個zookeeper叢集。若是當前連線的zookeeper宕機,broker可以連線到zookeeper群組中的其他伺服器上。
- log.dirs: kafka把所有訊息都儲存在磁碟上,存放這些日子片段的目錄是通過log.dirs指定的。它是一組用逗號分隔的本地檔案系統路徑。如果指定了多個路徑,那麼broker會根據“最少使用”原則,把同一個分割槽的日誌片段儲存到同一個路徑下。需要注意,borker會往擁有最少數目分割槽的路徑新增分割槽,而不是往擁有最小磁碟空間的路徑新增分割槽。
- num.recovery.threads.per.data.dir,對於如下3種情況,kafka會使用多少個執行緒來處理日誌片段。
- 伺服器正常啟動,用於開啟每個分割槽的日誌片段。
- 伺服器崩潰後重啟,用於檢查和截短每個分割槽的日誌片段。
- 伺服器正常關閉,用於關閉日誌片段。
預設情況下,每個日誌目錄只使用一個執行緒。因為這些執行緒只是在伺服器啟動和關閉時會用到,所以完全可以設定大量的執行緒來達到並行操作的目的。特別是對於包含大量分割槽的伺服器來說,一旦發生崩潰,在進行恢復時使用並行操作可能會省下很多時間。所配置的數字對應的是log.dirs指定的單個日誌目錄。譬如,這個值被設定為8,並且log.dirx指定了3個路徑,那麼久需要24個執行緒。
- auto.create.topics.enable 預設情況下,kafka會在以下幾種情況下建立主題。
- 當一個生產者開始往主題寫入訊息時。
- 當一個消費者開始從主題讀取訊息時。
- 當任意一個客戶端向主題傳送元資料請求是。
這個引數有點迷糊,儘量設定為false吧!
上面指定了kafa的日誌目錄,需要建立訊息目錄:
[[email protected] ~]# mkdir -p /data/kafka/logs
和zookeeper一樣,把上面解壓後的檔案和建立的日誌目錄,分別拷貝到其餘兩臺伺服器上,注意修改其餘兩臺伺服器的broker.id。
然後啟動kafka:
[[email protected] bin]# ls connect-distributed.sh kafka-consumer-groups.sh kafka-reassign-partitions.sh kafka-simple-consumer-shell.sh zookeeper-server-start.sh connect-standalone.sh kafka-consumer-offset-checker.sh kafka-replay-log-producer.sh kafka-topics.sh zookeeper-server-stop.sh kafka-acls.sh kafka-consumer-perf-test.sh kafka-replica-verification.sh kafka-verifiable-consumer.sh zookeeper-shell.sh kafka-configs.sh kafka-mirror-maker.sh kafka-run-class.sh kafka-verifiable-producer.sh kafka-console-consumer.sh kafka-preferred-replica-election.sh kafka-server-start.sh windows kafka-console-producer.sh kafka-producer-perf-test.sh kafka-server-stop.sh zookeeper-security-migration.sh
#bin目錄下面有很多指令碼,kafka自帶的有zookeeper啟動,但是建議搭建獨立的zookeeper叢集。
[[email protected] bin]# ./kafka-server-start.sh /usr/local/kafka/config/server.properties 1>/dev/null 2>&1 &
[1] 8757
[[email protected] bin]#
#kafka預設標準輸出到螢幕,上面的命令把kafka掛起在後臺執行。
#啟動之後檢視是否監聽了9092埠。
[[email protected] bin]# netstat -lntp |grep 9092
tcp 0 0 ::ffff:10.0.102.204:9092 :::* LISTEN 8757/java
[[email protected] bin]#
三臺伺服器全部啟動完畢,可以進行如下測試:
[[email protected] bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topci science #建立一個主題 topci is not a recognized option [[email protected] bin]#
#建立一個生產者,傳送訊息
[[email protected] bin]# ./kafka-console-producer.sh --broker-list 10.0.102.179:9092 --topic science
test message
first
#創意一個消費者物件,接收生產者傳送的訊息
[[email protected] bin]# ./kafka-console-consumer.sh --zookeeper 10.0.102.179:2181 --topic science --from-beginning
test message
first
#可以看到卡夫卡是一個先進先出的訊息佇列
若是以上的佇列訊息驗證成功,則說明叢集搭建成功、
[[email protected] bin]# ./kafka-topics.sh --zookeeper 10.0.102.179:2181 --describe --topic science #檢視主題的資訊 Topic:science PartitionCount:1 ReplicationFactor:1 Configs: Topic: science Partition: 0 Leader: 2 Replicas: 2 Isr: 2 [[email protected] bin]#
一個異常說明:在測試的時候,剛開始總報如下錯誤:
[[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic lianxi test1 [2018-12-20 17:33:27,112] ERROR Error when sending message to topic lianxi with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
這個問題搞了好久,最好把java的版本從1.8降到了1.7之後,然後就成功了!
搭建成功的各個元件的版本資訊如下:
[[email protected] ~]# java -version java version "1.7.0_79" Java(TM) SE Runtime Environment (build 1.7.0_79-b15) Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
#kafka沒有類似的version命令,需要檢視kafka的使用包資訊
[[email protected] libs]# pwd
/usr/local/kafka/libs
[[email protected] libs]# ls kafka_2.10-0.10.0.0.jar
kafka_2.10-0.10.0.0.jar
#2.10是scala的版本,後面的0.10.0.0是kafka的版本
#在zookeeper的安裝目錄下面有對應版本的一些包
[[email protected] zookeeper]# ls
bin CHANGES.txt contrib docs ivy.xml LICENSE.txt README_packaging.txt recipes zookeeper-3.4.6.jar zookeeper-3.4.6.jar.md5
build.xml conf dist-maven ivysettings.xml lib NOTICE.txt README.txt src zookeeper-3.4.6.jar.asc zookeeper-3.4.6.jar.sha1
[[email protected] zookeeper]#
#zookeeper的版本是3.4.6