kafka叢集與zookeeper叢集 配置過程
Kafka的叢集配置一般有三種方法,即
(1)Single node – single broker叢集;
(2)Single node – multiple broker叢集; (3)Multiple node – multiple broker叢集。
前兩種方法官網上有配置過程((1)(2)配置方法官網教程),下面會簡單介紹前兩種方法,主要介紹最後一種方法。
準備工作:
2.三臺CentOS 6.4 64位虛擬機器。分別是192.168.121.34(主機名為master)、192.168.121.35(主機名為datanode1)、192.168.121.36(主 機名為datanode2)。
一、Single node – single broker叢集配置(單節點單boker叢集配置)
注:圖片來源自網路
1.解壓Kafka的壓縮包
[[email protected] kafkainstall]# tar -xzf kafka_2.10-0.8.2.0.tgz
[[email protected] kafkainstall]# cd kafka_2.10-0.8.2.2
這裡我新建了一個kafkainstall資料夾來存放加壓後的檔案,然後進入解壓後的kafka_2.10-0.8.2.2資料夾。
2.啟動zookeeper服務
由於Kafka的壓縮包裡已經有了zookeeper,而且提供了啟動kafka的指令碼(在kafka_2.10-0.8.2.2/bin目錄下)和zookeeper的配置檔案 (在kafka_2.10-0.8.2.2/config目錄下):
[[email protected] kafka_2.10-0.8.2.2]# bin/zookeeper-server-start.sh config/zookeeper.properties &
zookeeper的配置檔案zookeeper.properties裡面的關鍵屬性:
# the directory where the snapshot is stored. dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181
預設情況下,zookeeper的snapshot 檔案會儲存在/tmp/zookeeper下,zookeeper伺服器會監聽 2181埠。
3.啟動Kafka broker服務
由於kafka已經提供了啟動kafka的指令碼(在kafka_2.10-0.8.2.2/bin目錄下),這裡直接啟動即可:
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server.properties &
Kafka broker的配置檔案的關鍵屬性:
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0
# The port the socket server listens on port=9092
# A comma seperated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs
# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181
4.建立只有一個Partition的topic
[[email protected] kafka_2.10-0.8.2.2]#bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytest-topic
這裡建立了一個mytest-topic的topic。
5.啟動一個生產者程序來發送訊息
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest-topic
其中,(1)引數broker-list定義了生產者要推送訊息的broker地址,以<IP地址:埠>形式 ,由上面的broker的配置檔案可知 為localhost:9092;
(2)引數topic指定生產者傳送給哪個topic。
生產者配置檔案關鍵屬性:
# list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ...metadata.broker.list=localhost:9092
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)producer.type=sync
# message encoderserializer.class=kafka.serializer.DefaultEncoder
接著你就可以輸入你想要傳送給消費者的訊息了。(也可以先啟動消費者程序,這樣生產者傳送的訊息可以立刻顯示)
6.啟動一個消費者程序來消費訊息
需要另外開啟一個終端:
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytest-topic --from-beginning
其中,(1)引數zookeeper指定了連線zookeeper的地址,以<IP地址:埠>形式;
(2)topic引數指定了從哪個topic來pull訊息。
當你執行這個命令之後,你便可以看到控制檯上打印出的生產者生產的訊息:
消費者配置檔案consumer.properties關鍵屬性:
# Zookeeper connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=localhost:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=60000 #consumer group id group.id=test-consumer-group
二、Single node – multiple broker叢集(單節點多boker叢集配置)
注:圖片來源自網路
1.啟動zookeeper服務
啟動方法跟上面一樣
2.啟動Kafka broker服務
如果需要在單個節點(即一臺機子)上面啟動多個broker(這裡我們啟動三個broker),需要準備多個server.properties檔案即可,我們需要複製kafka_2.10-0.8.2.2/config/server.properties檔案。
如下:
[[email protected] config]# cp server.properties server-1.properties
[[email protected] config]# cp server.properties server-2.properties
然後修改server-1.properties和server-2.properties。
server-1:
1. broker.id=1
2.port=9093
3.log.dirs=/tmp/kafka-logs-1
server-2:
1. broker.id=2
2.port=9094
3.log.dirs=/tmp/kafka-logs-2
然後我們再用這兩個配置檔案分別啟動一個broker:
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server-1.properties &
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server-2.properties &
然後啟動:
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-server-start.sh config/server.properties &
3.建立只有1個Partition和3個備份的的topic
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
4.啟動一個Producer傳送訊息
如果用一個Producer傳送給多個broker(這裡是3個),唯一需要改變的就是在broker-list屬性中指定要連線的broker:
[[email protected] kafka_2.10-0.8.2.2]#bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,
localhost:9094 --topic my-replicated-topic
5.啟動一個消費者來消費訊息
[[email protected] kafka_2.10-0.8.2.2]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicmy-replicated-topic --from-beginning
如果要讓不同的Producer傳送給不同的broker,我們也僅僅需要為每個Producer配置響應的broker-list屬性即可。
三、Multiple node – multiple broker叢集(多節點多boker叢集配置)
注:圖片來源自網路
注:上圖中每個Node裡有兩個broker,我這裡為了簡單寫,在每個節點裡有一個broker(通過上面的單節點多broker的介紹,可以很容易 擴充套件)
1.首先需要配置一個zookeeper叢集
上面一和二中提到的都是在192.168.121.34(主機名為master)上進行的,現在要擴充套件為多節點多broker叢集,就要在另外2臺機子上也要安裝Kafka,方法同一中的步驟1。
2.zookeeper叢集配置
zookeeper-0(即上面192.168.121.34(主機名為master)中的zookeeper):
配置修改為:
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper # the port at which the clients will connectclientPort=2181 #the blow five lines are added by @author.initLimit=5syncLimit=2server.0=192.168.121.34:2888:3888server.1=192.168.121.35:2889:3889server.2=192.168.121.36:2890:3890
然後在dataDir目錄/data/zookeeper/下寫一個myid檔案,命令如下:
echo0 >myid
注意:這個id是zookeeper的主機標識,每個主機id不同第二臺是1192.168.121.35(主機名為datanode1),第三臺是2192.168.121.36(主機名為datanode2)。也就是說3個zookeeper配置檔案除了myid不同,其他都一樣。
最後依次啟動3臺機子上的zookeeper服務。
3.配置broker 叢集
broker的配置配置檔案(server.properties):按照單節點多例項配置方法在一個節點上啟動1個例項,不同的地方是zookeeper的連線串 需要把所有節點的zookeeper都連線起來。
(1)192.168.121.34(主機名為master)中的kafka_2.10-0.8.2.2/bin/目錄下的server.properties檔案修改:
# Hostname the broker will bind to. If not set, the server will bind to all interfaceshost.name=192.168.121.34
# A comma seperated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs-0
# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes.zookeeper.connect=192.168.121.34:2181,192.168.121.35:2181,192.168.121.36:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=60000
注意:把host.name的註釋去掉,並更改為本機的IP地址。zookeeper.connection.timeout.ms的預設為6000,但是最好改大點,不然容易超時,但也不能太大,太大影響效率。
(2)192.168.121.35(主機名為datanode1)中的kafka_2.10-0.8.2.2/bin/目錄下的server.properties檔案修改:
# Hostname the broker will bind to. If not set, the server will bind to all interfaces host.name=192.168.121.35
# A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
其它與上面(1)中相同。
(3)192.168.121.36(主機名為datanode2)中的kafka_2.10-0.8.2.2/bin/目錄下的server.properties檔案修改:
# Hostname the broker will bind to. If not set, the server will bind to all interfaceshost.name=192.168.121.36
# A comma seperated list of directories under which to store log fileslog.dirs=/tmp/kafka-logs-2
其它與上面(1)中相同。
4.生產者配置檔案修改
# list of brokers used for bootstrapping knowledge about the rest of the cluster# format: host1:port1,host2:port2 ...metadata.broker.list=192.168.121.34:9092,192.168.121.35:9092,192.168.121.36:9092# name of the partitioner class for partitioning events; default partition spreads data randomly#partitioner.class=# specifies whether the messages are sent asynchronously (async) or synchronously (sync)producer.type=async
5.消費者配置檔案修改
# Zookeeper connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=191.168.121.34:2181,191.168.121.35:2181,191.168.121.36:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=60000
6.生產者傳送訊息
(1)首先建立一個test-replicated-topic(在192.168.121.34(主機名為master)中)
[[email protected] kafka_2.10-0.8.2.2]#bin/kafka-topics.sh --create --zookeeper192.168.121.34:2181 --replication-factor 3 --partitions 1 --topictest-replicated-topic
然後檢視已有的topic:
可以看到test-replicated-topic已經建立成功,然後我們再看每個broker在做什麼:
其中leader是負責對給定的partition執行所有的讀和寫的節點,此時的leader是0號節點(即0號broker)。更多解釋請看官網。
(2)生產者傳送訊息(192.168.121.34(主機名為master)節點上)
7.消費者消費訊息(分別在三臺機子上消費上面傳送的訊息)
(1)master上:
(2)datanode1上:
(3)datanode2上:
可以看到,三個節點上的消費者都能正常的接收到其中一個節點上傳送的訊息。這說明kafka叢集基本上已經超過部署。
PS:實際操作過程中3個節點上的zookeeper的監聽埠我也沒有統一用2181,但是可以用統一的埠,並沒有影響。