1. 程式人生 > 實用技巧 >搭建kafka叢集

搭建kafka叢集

一、環境準備

OS hostname IP
centos 7.2 kafka01 192.168.99.233
centos 7.2 kafka02 192.168.99.234
centos 7.2 kafka03 192.168.99.235

1.1 配置hosts檔案

$ cat >> /etc/hosts << EOF
192.168.99.233 kafka01
192.168.99.234 kafka02
192.168.99.235 kafka03
EOF

$ ssh-keygen 
$ for i in kafka0{1..3};do ssh-copy-id ${i};done
$ for i in kafka0{1..3};do scp /etc/hosts ${i}:/etc/;done

二、部署zookeeper叢集

2.1 獲取kafka軟體包

$ cd /opt	
$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
$ tar zxf kafka_2.13-2.5.0.tgz 
$ mv kafka_2.13-2.5.0 kafka

2.2 配置zookeeper

$ rm -rf /opt/kafka/bin/windows
$ cat > /opt/kafka/config/zookeeper.properties << EOF
dataDir=/data/zk
clientPort=2181
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
quorumListenOnAllIPs=true
server.1=kafka01:2888:3888
server.2=kafka02:2888:3888
server.3=kafka03:2888:3888
EOF
$ mkdir /data/zk -pv
$ echo 1 > /data/zk/myid

配置檔案解釋:

  • dataDir:zookeeper資料存放目錄;

  • clientPort:client連線zookeeper需要指定的埠;

  • maxClientCnxns:表示允許客戶端最大連線數,如果設定為0,則表示不限制;

  • tickTime:表示zookeeper叢集之間的心跳檢測時間間隔(單位是毫秒);

  • initLimit:表示zookeeper叢集中的follower在啟動時需要在多少個心跳檢測時間內從leader同步資料(如果資料量過大,此值可適當調整);

  • syncLimit:表示超過多少個心跳時間收不到follower的響應,leader就認為follower已經下線;

  • quorumListenOnAllIPs:該引數設定為true,配置為true可以避免入坑(尤其是多網絡卡主機),Zookeeper伺服器將監聽所有可用IP地址的連線。他會影響ZAB協議和快速Leader選舉協議。預設是false;

  • server.x=IP:Port1:Port2:指定zk節點列表;

    • x:表示節點的編號,此編號需要寫入到zk資料存放目錄下,以myid命名的檔案。

    • IP:可以指定IP也可以是主機名。

    • Port1表示該Zookeeper叢集中的Follower節點與Leader節點通訊時使用的埠。作為Leader時監聽該埠。

    • Port2表示選舉新的Leader時,Zookeeper節點之間互相通訊的埠,比如當Leader掛掉時,其餘伺服器會互相通訊,選出新的Leader。Leader和Follower都會監聽該埠。

至此,第一臺主機的zk配置完成,但是現在先不啟動,我們先接著把第一臺主機的kafka也配置好。

三、配置kafka

$ cat > /opt/kafka/config/server.properties << EOF
broker.id=0
listeners=PLAINTEXT://:9092
auto.create.topics.enable=true
delete.topic.enable=true
num.network.threads=15
num.io.threads=30
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/log
num.partitions=3
num.recovery.threads.per.data.dir=3
default.replication.factor=2
num.replica.fetchers=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka01:2181:kafka02:2181,kafka03:2181
zookeeper.connection.timeout.ms=10000
group.initial.rebalance.delay.ms=0
EOF
$ mkdir -pv /data/kafka/log 

配置檔案解釋:

以下只介紹比較常見又比較重要的引數:

  • auto.create.topics.enable:該配置項預設值是true,但在生產環境最好設定為false。這樣可以控制建立Topic的人以及建立時間。

  • background.threads:該配置項預設值是10,既整個Kafka在執行各種任務時會啟動的執行緒數。如果你的CPU很強勁,那麼可以將執行緒數設大一點。

  • delete.topic.enable:該配置項預設值是false,可以根據實際需求改變,在生產環境還是建議保持預設值,這樣至少不會出現Topic被誤刪的情況。

  • log.flush.interval.messages:該配置項最好保持預設值,把這個任務交給作業系統的檔案系統去處理。

  • log.retention.hours:日誌檔案保留的時間預設是168小時,即7天。這個配置可以根據具體業務需求而定。

  • message.max.bytes:每條Message或一批次Message的大小預設是1MB。這個配置也要根據具體需求而定,比如頻寬的情況。

  • min.insync.replicas:該配置項的預設值是1,既在acks=all時,最少得有一個Replica進行確認回執。建議在生產環境配置為2,保證資料的完整性。

  • num.io.threads:處理I/O操作的執行緒數,預設是8個執行緒。如果覺得在這個環節達到了瓶頸,那麼可以適當調整該引數。

  • num.network.threads:處理網路請求和響應的執行緒數,預設是3個執行緒。如果覺得在這個環節達到了瓶頸,那麼可以適當調整該引數。

  • num.recovery.threads.per.data.dir:每個資料目錄啟用幾個執行緒來處理,這裡的執行緒數和資料目錄數是乘積關係,並且只在Broker啟動或關閉時使用。預設值是1,根據實際情況配置資料目錄數,從而判斷該配置項應該如何設定。

  • num.replica.fetchers:該配置項影響Replicas同步資料的速度,預設值是1,如果發現Replicas同步延遲較大,可以提升該配置項。

  • offsets.retention.minutes:Offset保留的時間,預設值是1440,既24小時。在生產環境建議將該配置項設大一點,比如設定為1個月,保證消費資料的完整性。

  • unclean.leader.election.enable:該配置項的作用是,指定是否可以將非ISR的Replicas選舉為Leader,預設值為false。在生產環境建議保持預設值,防止資料丟失。

  • zookeeper.session.timeout.ms:Zookeeper會話超時時間,預設值為6000。按實際情況而定,通常情況下保持60秒即可。

  • default.replication.factor:預設Replication Factor為1,建議設定為2或者3,以保證資料完整性和整個叢集的健壯性。

  • num.partitions:Topic預設的Partition數,預設是1,建議設定為3或者6,以保證資料完整性和整個叢集的健壯性。

  • group.initial.rebalance.delay.ms:當Consumer Group新增或減少Consumer時,重新分配Topic Partition的延遲時間。

至此第一個節點上的kafka也配置完成!為了簡單起見,將所需檔案拷貝到另外兩個節點上!

四、 複製所需檔案到其他節點

$ for i in kafka0{2..3};do rsync -avz /opt/kafka ${i}:/opt/;done
$ for i in kafka0{2..3};do rsync -avz /data ${i}:/;done

# kafka02
$ echo 2 > /data/zk/myid
$ sed -i 's#broker.id=0#broker.id=1#g' /opt/kafka/config/server.properties

# kafka03
$ echo 3 > /data/zk/myid
$ sed -i 's#broker.id=0#broker.id=2#g' /opt/kafka/config/server.properties

五、 啟動zookeeper

5.1 啟動zookeeper

每個節點都需執行以下操作:

$ cd /opt/kafka/bin
$ ./zookeeper-server-start.sh ../config/zookeeper.properties &

$ ss -lnpt | egrep '2181|3888|2888'      # 只有leader才會監聽2888埠
LISTEN     0      50          :::2181                    :::*                   users:(("java",pid=58948,fd=118))
LISTEN     0      50          :::2888                    :::*                   users:(("java",pid=58948,fd=130))
LISTEN     0      50          :::3888                    :::*                   users:(("java",pid=58948,fd=127))

5.2 驗證

我們通過Zookeeper Client連線到叢集來檢驗。我們選擇任意一臺伺服器,首先連線kafka01主機。

$ ./zookeeper-shell.sh kafka01:2181      # 連線到kafka01
create /my_zNode "some data"       # 連線成功後,建立一個zNode
ls /                   # 檢視節點中所有znode
[my_zNode, zookeeper]


$ ./zookeeper-shell.sh kafka02:2181       # 再連線到kaka02
ls /          # 同樣可以檢視到my_zNode
[my_zNode, zookeeper]
get /my_zNode     # 檢視my_zNode中的資料
some data


$ ./zookeeper-shell.sh kafka03:2181      # 再連線到kafka03節點
ls /       # 檢視
[my_zNode, zookeeper]
get /my_zNode             # 獲取資料
some data
set /my_zNode "new data"          # 修改my_zNode中的資料
get /my_zNode      # 確認資料已修改
new data


$ ./zookeeper-shell.sh kafka01:2181      # 再連線到kafka1,確定資料修改

get /my_zNode
new data

上面的過程雖然比較繁瑣,但是充分說明了我們的Zookeeper叢集是搭建成功的。無論從哪個Zookeeper節點建立的zNode,都可以同步到叢集中的其他節點。無論從哪個Zookeeper節點修改的zNode中的資料,也可以同步到起群中的其他節點。

六、啟動kafka

6.1 啟動kafka

三個節點都需啟動kafka!

$ cd /opt/kafka/bin/
$ ./kafka-server-start.sh -daemon ../config/server.properties
$ ss -lnt | grep 9092           # 三臺kafka都啟動完成後,端口才會監聽
LISTEN     0      50          :::9092                    :::*                  

6.2 kafka常用指令

當三個節點都啟動後,並且確認9092埠在監聽,那麼就可以執行下面的指令,來測試kafka是否正常了。

$ cd /opt/kafka
# 顯示topic列表
$ bin/kafka-topics.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181  --list  
也可以從一個節點上檢視。下面簡寫檢視一個節點。

# 建立一個topic,並指定topic屬性(副本數、分割槽數等)
$ bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 3 --topic test
# --partitions(分割槽)應等於或大於消費者,--replication-factor(副本數)不能大於kafka叢集內主機節點

# 檢視某個topic的狀態
$ bin/kafka-topics.sh   --zookeeper   kafka1:2181    --topic   test   --describe

# 生產訊息
$ bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic  test    

# 消費訊息
$ bin/kafka-console-consumer.sh --bootstrap-server PLAINTEXT://kafka1:9092 --topic  test

# 檢視實時訊息,如果從頭看可在後面加   --from-beginning  
$ bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test --from-beginning

# 刪除topic
$ bin/kafka-topics.sh --delete --zookeeper kafka1:2181  --topic   test

七、安裝kafka manager

kafka有一個管理工具叫kafka-manager,它支援管理多個叢集、選擇副本、副本重新分配以及建立Topic。同時,這個管理工具也是一個非常好的可以快速瀏覽這個叢集的工具。

kafka manager有如下功能:

  • 管理多個kafka叢集

  • 便捷的檢查kafka叢集狀態(topics,brokers,備份分佈情況,分割槽分佈情況)

  • 選擇你要執行的副本

  • 基於當前分割槽狀況進行

  • 可以選擇topic配置並建立topic(0.8.1.1和0.8.2的配置不同)

  • 刪除topic(只支援0.8.2以上的版本並且要在broker配置中設定delete.topic.enable=true)

  • Topic list會指明哪些topic被刪除(在0.8.2以上版本適用)

  • 為已存在的topic增加分割槽

  • 為已存在的topic更新配置

  • 在多個topic上批量重分割槽

  • 在多個topic上批量重分割槽(可選partition broker位置)

強烈建議使用docker來部署這個kafka-manager工具,如下:

$ docker run -itd --rm  -p 9000:9000 -e ZK_HOSTS="192.168.99.233:2181,192.168.99.234:2181,192.168.99.235:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager

此專案託管在github上,可以參閱github

docker啟動後,訪問9000埠,介面如下:

關於kafka-manager的配置使用,可以參閱詳細解析 kafka manager 的使用

八、kafka開啟jmx polling

在使用kafka-amnager時,有一個功能為Enable JMX Polling,該部分直接影響部分 kafka broker 和 topic 監控指標指標的獲取,那麼如何開啟此功能呢?方法有兩種。如下:

方法1:

啟動kafka時增加JMX_PORT=9988。如下:

$ JMX_PORT=9988 ./kafka-server-start.sh -daemon ../config/server.properties

方法2:

修改kafka-run-class.sh指令碼,第一行增加JMX_PORT=9988即可。

不管是哪種方法,都只是定義了一個變數而已,剩下的事情交給程式去做就好。

附加:k8s部署kafka-manager

在k8s中部署kafka-manager的yml檔案如下(可根據實際情況來修改):

apiVersion: v1
kind: Service
metadata:
  name: iov-sjjh-kafka-manager
  namespace: iov-sjjh
  labels:
    name: iov-sjjh-kafka-manager
spec:
  type: NodePort
  ports:
  - name: service
    port: 9000
    nodePort: 30200
    targetPort: 9000
  selector:
    name: iov-sjjh-kafka-manager
---
apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: iov-sjjh-kafka-manager
  namespace: iov-sjjh
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: iov-sjjh-kafka-manager
    spec:
      containers:
      - name: iov-sjjh-kafka-manager
        image: sheepkiller/kafka-manager:latest 
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9000
        resources:
          limits:
            memory: 1G
            cpu: 2
          requests:
            memory: 1G
            cpu: 2
        env:
        - name: "ZK_HOSTS"
          value: "192.168.99.233:2181,192.168.99.234:2181,192.168.99.235:2181"
        - name: "KAFKA_MANAGER_AUTH_ENABLED"
          value: "true"
        - name: "KAFKA_MANAGER_USERNAME"
          value: "iov_sjjh"
        - name: "KAFKA_MANAGER_PASSWORD"
          value: "iov_sjjh123"