搭建kafka叢集
一、環境準備
OS | hostname | IP |
---|---|---|
centos 7.2 | kafka01 | 192.168.99.233 |
centos 7.2 | kafka02 | 192.168.99.234 |
centos 7.2 | kafka03 | 192.168.99.235 |
1.1 配置hosts檔案
$ cat >> /etc/hosts << EOF 192.168.99.233 kafka01 192.168.99.234 kafka02 192.168.99.235 kafka03 EOF $ ssh-keygen $ for i in kafka0{1..3};do ssh-copy-id ${i};done $ for i in kafka0{1..3};do scp /etc/hosts ${i}:/etc/;done
二、部署zookeeper叢集
2.1 獲取kafka軟體包
$ cd /opt
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
$ tar zxf kafka_2.13-2.5.0.tgz
$ mv kafka_2.13-2.5.0 kafka
2.2 配置zookeeper
$ rm -rf /opt/kafka/bin/windows $ cat > /opt/kafka/config/zookeeper.properties << EOF dataDir=/data/zk clientPort=2181 maxClientCnxns=0 tickTime=2000 initLimit=10 syncLimit=5 quorumListenOnAllIPs=true server.1=kafka01:2888:3888 server.2=kafka02:2888:3888 server.3=kafka03:2888:3888 EOF $ mkdir /data/zk -pv $ echo 1 > /data/zk/myid
配置檔案解釋:
-
dataDir
:zookeeper資料存放目錄; -
clientPort
:client連線zookeeper需要指定的埠; -
maxClientCnxns
:表示允許客戶端最大連線數,如果設定為0,則表示不限制; -
tickTime
:表示zookeeper叢集之間的心跳檢測時間間隔(單位是毫秒); -
initLimit
:表示zookeeper叢集中的follower
在啟動時需要在多少個心跳檢測時間內從leader
同步資料(如果資料量過大,此值可適當調整); -
syncLimit
:表示超過多少個心跳時間收不到follower
的響應,leader
就認為follower
已經下線; -
quorumListenOnAllIPs
:該引數設定為true,配置為true可以避免入坑(尤其是多網絡卡主機),Zookeeper伺服器將監聽所有可用IP地址的連線。他會影響ZAB協議和快速Leader選舉協議。預設是false; -
server.x=IP:Port1:Port2
:指定zk節點列表;-
x:表示節點的編號,此編號需要寫入到zk資料存放目錄下,以myid命名的檔案。
-
IP:可以指定IP也可以是主機名。
-
Port1表示該Zookeeper叢集中的Follower節點與Leader節點通訊時使用的埠。作為Leader時監聽該埠。
-
Port2表示選舉新的Leader時,Zookeeper節點之間互相通訊的埠,比如當Leader掛掉時,其餘伺服器會互相通訊,選出新的Leader。Leader和Follower都會監聽該埠。
-
至此,第一臺主機的zk配置完成,但是現在先不啟動,我們先接著把第一臺主機的kafka也配置好。
三、配置kafka
$ cat > /opt/kafka/config/server.properties << EOF
broker.id=0
listeners=PLAINTEXT://:9092
auto.create.topics.enable=true
delete.topic.enable=true
num.network.threads=15
num.io.threads=30
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/log
num.partitions=3
num.recovery.threads.per.data.dir=3
default.replication.factor=2
num.replica.fetchers=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka01:2181:kafka02:2181,kafka03:2181
zookeeper.connection.timeout.ms=10000
group.initial.rebalance.delay.ms=0
EOF
$ mkdir -pv /data/kafka/log
配置檔案解釋:
以下只介紹比較常見又比較重要的引數:
-
auto.create.topics.enable
:該配置項預設值是true,但在生產環境最好設定為false。這樣可以控制建立Topic的人以及建立時間。 -
background.threads
:該配置項預設值是10,既整個Kafka在執行各種任務時會啟動的執行緒數。如果你的CPU很強勁,那麼可以將執行緒數設大一點。 -
delete.topic.enable
:該配置項預設值是false,可以根據實際需求改變,在生產環境還是建議保持預設值,這樣至少不會出現Topic被誤刪的情況。 -
log.flush.interval.messages
:該配置項最好保持預設值,把這個任務交給作業系統的檔案系統去處理。 -
log.retention.hours
:日誌檔案保留的時間預設是168小時,即7天。這個配置可以根據具體業務需求而定。 -
message.max.bytes
:每條Message或一批次Message的大小預設是1MB。這個配置也要根據具體需求而定,比如頻寬的情況。 -
min.insync.replicas
:該配置項的預設值是1,既在acks=all時,最少得有一個Replica進行確認回執。建議在生產環境配置為2,保證資料的完整性。 -
num.io.threads
:處理I/O操作的執行緒數,預設是8個執行緒。如果覺得在這個環節達到了瓶頸,那麼可以適當調整該引數。 -
num.network.threads
:處理網路請求和響應的執行緒數,預設是3個執行緒。如果覺得在這個環節達到了瓶頸,那麼可以適當調整該引數。 -
num.recovery.threads.per.data.dir
:每個資料目錄啟用幾個執行緒來處理,這裡的執行緒數和資料目錄數是乘積關係,並且只在Broker啟動或關閉時使用。預設值是1,根據實際情況配置資料目錄數,從而判斷該配置項應該如何設定。 -
num.replica.fetchers
:該配置項影響Replicas同步資料的速度,預設值是1,如果發現Replicas同步延遲較大,可以提升該配置項。 -
offsets.retention.minutes
:Offset保留的時間,預設值是1440,既24小時。在生產環境建議將該配置項設大一點,比如設定為1個月,保證消費資料的完整性。 -
unclean.leader.election.enable
:該配置項的作用是,指定是否可以將非ISR的Replicas選舉為Leader,預設值為false。在生產環境建議保持預設值,防止資料丟失。 -
zookeeper.session.timeout.ms
:Zookeeper會話超時時間,預設值為6000。按實際情況而定,通常情況下保持60秒即可。 -
default.replication.factor
:預設Replication Factor為1,建議設定為2或者3,以保證資料完整性和整個叢集的健壯性。 -
num.partitions
:Topic預設的Partition數,預設是1,建議設定為3或者6,以保證資料完整性和整個叢集的健壯性。 -
group.initial.rebalance.delay.ms
:當Consumer Group新增或減少Consumer時,重新分配Topic Partition的延遲時間。
至此第一個節點上的kafka也配置完成!為了簡單起見,將所需檔案拷貝到另外兩個節點上!
四、 複製所需檔案到其他節點
$ for i in kafka0{2..3};do rsync -avz /opt/kafka ${i}:/opt/;done
$ for i in kafka0{2..3};do rsync -avz /data ${i}:/;done
# kafka02
$ echo 2 > /data/zk/myid
$ sed -i 's#broker.id=0#broker.id=1#g' /opt/kafka/config/server.properties
# kafka03
$ echo 3 > /data/zk/myid
$ sed -i 's#broker.id=0#broker.id=2#g' /opt/kafka/config/server.properties
五、 啟動zookeeper
5.1 啟動zookeeper
每個節點都需執行以下操作:
$ cd /opt/kafka/bin
$ ./zookeeper-server-start.sh ../config/zookeeper.properties &
$ ss -lnpt | egrep '2181|3888|2888' # 只有leader才會監聽2888埠
LISTEN 0 50 :::2181 :::* users:(("java",pid=58948,fd=118))
LISTEN 0 50 :::2888 :::* users:(("java",pid=58948,fd=130))
LISTEN 0 50 :::3888 :::* users:(("java",pid=58948,fd=127))
5.2 驗證
我們通過Zookeeper Client連線到叢集來檢驗。我們選擇任意一臺伺服器,首先連線kafka01主機。
$ ./zookeeper-shell.sh kafka01:2181 # 連線到kafka01
create /my_zNode "some data" # 連線成功後,建立一個zNode
ls / # 檢視節點中所有znode
[my_zNode, zookeeper]
$ ./zookeeper-shell.sh kafka02:2181 # 再連線到kaka02
ls / # 同樣可以檢視到my_zNode
[my_zNode, zookeeper]
get /my_zNode # 檢視my_zNode中的資料
some data
$ ./zookeeper-shell.sh kafka03:2181 # 再連線到kafka03節點
ls / # 檢視
[my_zNode, zookeeper]
get /my_zNode # 獲取資料
some data
set /my_zNode "new data" # 修改my_zNode中的資料
get /my_zNode # 確認資料已修改
new data
$ ./zookeeper-shell.sh kafka01:2181 # 再連線到kafka1,確定資料修改
get /my_zNode
new data
上面的過程雖然比較繁瑣,但是充分說明了我們的Zookeeper叢集是搭建成功的。無論從哪個Zookeeper節點建立的zNode,都可以同步到叢集中的其他節點。無論從哪個Zookeeper節點修改的zNode中的資料,也可以同步到起群中的其他節點。
六、啟動kafka
6.1 啟動kafka
三個節點都需啟動kafka!
$ cd /opt/kafka/bin/
$ ./kafka-server-start.sh -daemon ../config/server.properties
$ ss -lnt | grep 9092 # 三臺kafka都啟動完成後,端口才會監聽
LISTEN 0 50 :::9092 :::*
6.2 kafka常用指令
當三個節點都啟動後,並且確認9092埠在監聽,那麼就可以執行下面的指令,來測試kafka是否正常了。
$ cd /opt/kafka
# 顯示topic列表
$ bin/kafka-topics.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --list
也可以從一個節點上檢視。下面簡寫檢視一個節點。
# 建立一個topic,並指定topic屬性(副本數、分割槽數等)
$ bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 3 --topic test
# --partitions(分割槽)應等於或大於消費者,--replication-factor(副本數)不能大於kafka叢集內主機節點
# 檢視某個topic的狀態
$ bin/kafka-topics.sh --zookeeper kafka1:2181 --topic test --describe
# 生產訊息
$ bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic test
# 消費訊息
$ bin/kafka-console-consumer.sh --bootstrap-server PLAINTEXT://kafka1:9092 --topic test
# 檢視實時訊息,如果從頭看可在後面加 --from-beginning
$ bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test --from-beginning
# 刪除topic
$ bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test
七、安裝kafka manager
kafka有一個管理工具叫kafka-manager,它支援管理多個叢集、選擇副本、副本重新分配以及建立Topic。同時,這個管理工具也是一個非常好的可以快速瀏覽這個叢集的工具。
kafka manager有如下功能:
-
管理多個kafka叢集
-
便捷的檢查kafka叢集狀態(topics,brokers,備份分佈情況,分割槽分佈情況)
-
選擇你要執行的副本
-
基於當前分割槽狀況進行
-
可以選擇topic配置並建立topic(0.8.1.1和0.8.2的配置不同)
-
刪除topic(只支援0.8.2以上的版本並且要在broker配置中設定delete.topic.enable=true)
-
Topic list會指明哪些topic被刪除(在0.8.2以上版本適用)
-
為已存在的topic增加分割槽
-
為已存在的topic更新配置
-
在多個topic上批量重分割槽
-
在多個topic上批量重分割槽(可選partition broker位置)
強烈建議使用docker來部署這個kafka-manager工具,如下:
$ docker run -itd --rm -p 9000:9000 -e ZK_HOSTS="192.168.99.233:2181,192.168.99.234:2181,192.168.99.235:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager
此專案託管在github上,可以參閱github。
docker啟動後,訪問9000埠,介面如下:
關於kafka-manager的配置使用,可以參閱詳細解析 kafka manager 的使用
八、kafka開啟jmx polling
在使用kafka-amnager時,有一個功能為Enable JMX Polling
,該部分直接影響部分 kafka broker 和 topic 監控指標指標的獲取,那麼如何開啟此功能呢?方法有兩種。如下:
方法1:
啟動kafka時增加JMX_PORT=9988。如下:
$ JMX_PORT=9988 ./kafka-server-start.sh -daemon ../config/server.properties
方法2:
修改kafka-run-class.sh指令碼,第一行增加JMX_PORT=9988即可。
不管是哪種方法,都只是定義了一個變數而已,剩下的事情交給程式去做就好。
附加:k8s部署kafka-manager
在k8s中部署kafka-manager的yml檔案如下(可根據實際情況來修改):
apiVersion: v1
kind: Service
metadata:
name: iov-sjjh-kafka-manager
namespace: iov-sjjh
labels:
name: iov-sjjh-kafka-manager
spec:
type: NodePort
ports:
- name: service
port: 9000
nodePort: 30200
targetPort: 9000
selector:
name: iov-sjjh-kafka-manager
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: iov-sjjh-kafka-manager
namespace: iov-sjjh
spec:
replicas: 1
template:
metadata:
labels:
name: iov-sjjh-kafka-manager
spec:
containers:
- name: iov-sjjh-kafka-manager
image: sheepkiller/kafka-manager:latest
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9000
resources:
limits:
memory: 1G
cpu: 2
requests:
memory: 1G
cpu: 2
env:
- name: "ZK_HOSTS"
value: "192.168.99.233:2181,192.168.99.234:2181,192.168.99.235:2181"
- name: "KAFKA_MANAGER_AUTH_ENABLED"
value: "true"
- name: "KAFKA_MANAGER_USERNAME"
value: "iov_sjjh"
- name: "KAFKA_MANAGER_PASSWORD"
value: "iov_sjjh123"