1. 程式人生 > >Kafka叢集部署指南

Kafka叢集部署指南

一、前言

1、Kafka簡介

Kafka是一個開源的分散式訊息引擎/訊息中介軟體,同時Kafka也是一個流處理平臺。Kakfa支援以釋出/訂閱的方式在應用間傳遞訊息,同時並基於訊息功能添加了Kafka Connect、Kafka Streams以支援連線其他系統的資料(Elasticsearch、Hadoop等)

Kafka最核心的最成熟的還是他的訊息引擎,所以Kafka大部分應用場景還是用來作為訊息佇列削峰平谷。另外,Kafka也是目前效能最好的訊息中介軟體。

2、Kafka架構

在Kafka叢集(Cluster)中,一個Kafka節點就是一個Broker,訊息由Topic來承載,可以儲存在1個或多個Partition中。釋出訊息的應用為Producer、消費訊息的應用為Consumer,多個Consumer可以促成Consumer Group共同消費一個Topic中的訊息。

概念/物件簡單說明
Broker Kafka節點
Topic 主題,用來承載訊息
Partition 分割槽,用於主題分片儲存
Producer 生產者,向主題釋出訊息的應用
Consumer 消費者,從主題訂閱訊息的應用
Consumer Group 消費者組,由多個消費者組成

3、準備工作

1、Kafka伺服器

準備3臺CentOS伺服器,並配置好靜態IP、主機名

伺服器名IP說明
kafka01 192.168.88.51 Kafka節點1
kafka02 192.168.88.52 Kafka節點2
kafka03 192.168.88.53 Kafka節點3

軟體版本說明

說明
Linux Server CentOS 7
Kafka 2.3.0

2、ZooKeeper叢集

Kakfa叢集需要依賴ZooKeeper儲存Broker、Topic等資訊,這裡我們部署三臺ZK

伺服器名IP說明
zk01 192.168.88.21 ZooKeeper節點
zk02 192.168.88.22 ZooKeeper節點
zk03 192.168.88.23 ZooKeeper節點

部署過程參考:https://ken.io/note/zookeeper...

二、部署過程

1、應用&資料目錄

#建立應用目錄
mkdir /usr/kafka

#建立Kafka資料目錄
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka

2、下載&解壓

Kafka官方下載地址:https://kafka.apache.org/down...
這次我下載的是2.3.0版本

#建立並進入下載目錄
mkdir /home/downloads
cd /home/downloads

#下載安裝包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz 

#解壓到應用目錄
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala編譯器的版本,2.3.0才是Kafka的版本

3、Kafka節點配置

#進入應用目錄
cd /usr/kafka/kafka_2.12-2.3.0/

#修改配置檔案
vi config/server.properties

通用配置

配置日誌目錄、指定ZooKeeper伺服器

# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs

# root directory for all kafka znodes.
zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181

分節點配置

  • Kafka01
broker.id=0

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.51:9092
  • Kafka02
broker.id=1

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.52:9092
  • Kafka03
broker.id=2

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.53:9092

4、防火牆配置

#開放埠
firewall-cmd --add-port=9092/tcp --permanent

#重新載入防火牆配置
firewall-cmd --reload

5、啟動Kafka

#進入kafka根目錄
cd /usr/kafka/kafka_2.12-2.3.0/
#啟動
/bin/kafka-server-start.sh config/server.properties &

#啟動成功輸出示例(最後幾行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

三、Kafka測試

1、建立Topic

在kafka01(Broker)上建立測試Tpoic:test-ken-io,這裡我們指定了3個副本、1個分割槽

bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io

Topic在kafka01上建立後也會同步到叢集中另外兩個Broker:kafka02、kafka03

2、檢視Topic

我們可以通過命令列出指定Broker的

bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092

3、傳送訊息

這裡我們向Broker(id=0)的Topic=test-ken-io傳送訊息

bin/kafka-console-producer.sh --broker-list  192.168.88.51:9092  --topic test-ken-io

#訊息內容
> test by ken.io

4、消費訊息

在Kafka02上消費Broker03的訊息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning

在Kafka03上消費Broker02的訊息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning

然後均能收到訊息

test by ken.io

這是因為這兩個消費訊息的命令是建立了兩個不同的Consumer
如果我們啟動Consumer指定Consumer Group Id就可以作為一個消費組協同工,1個訊息同時只會被一個Consumer消費到

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken

四、備註

1、Kafka常用配置項說明

Kafka常用Broker配置說明:

配置項預設值/示例值說明
broker.id 0 Broker唯一標識
listeners PLAINTEXT://192.168.88.53:9092 監聽資訊,PLAINTEXT表示明文傳輸
log.dirs kafka/logs kafka資料存放地址,可以填寫多個。用","間隔
message.max.bytes message.max.bytes 單個訊息長度限制,單位是位元組
num.partitions 1 預設分割槽數
log.flush.interval.messages Long.MaxValue 在資料被寫入到硬碟和消費者可用前最大累積的訊息的數量
log.flush.interval.ms Long.MaxValue 在資料被寫入到硬碟前的最大時間
log.flush.scheduler.interval.ms Long.MaxValue 檢查資料是否要寫入到硬碟的時間間隔。
log.retention.hours 24 控制一個log保留時間,單位:小時
zookeeper.connect 192.168.88.21:2181 ZooKeeper伺服器地址,多臺用","間隔

2、附錄

  • https://kafka.apache.org/
  • https://zh.wikipedia.org/zh-c...

本文首發於我的獨立部落格:https://ken.io/note/kafka-cluster-deploy-g