Kafka叢集部署指南
一、前言
1、Kafka簡介
Kafka是一個開源的分散式訊息引擎/訊息中介軟體,同時Kafka也是一個流處理平臺。Kakfa支援以釋出/訂閱的方式在應用間傳遞訊息,同時並基於訊息功能添加了Kafka Connect、Kafka Streams以支援連線其他系統的資料(Elasticsearch、Hadoop等)
Kafka最核心的最成熟的還是他的訊息引擎,所以Kafka大部分應用場景還是用來作為訊息佇列削峰平谷。另外,Kafka也是目前效能最好的訊息中介軟體。
2、Kafka架構
在Kafka叢集(Cluster)中,一個Kafka節點就是一個Broker,訊息由Topic來承載,可以儲存在1個或多個Partition中。釋出訊息的應用為Producer、消費訊息的應用為Consumer,多個Consumer可以促成Consumer Group共同消費一個Topic中的訊息。
概念/物件 | 簡單說明 |
---|---|
Broker | Kafka節點 |
Topic | 主題,用來承載訊息 |
Partition | 分割槽,用於主題分片儲存 |
Producer | 生產者,向主題釋出訊息的應用 |
Consumer | 消費者,從主題訂閱訊息的應用 |
Consumer Group | 消費者組,由多個消費者組成 |
3、準備工作
1、Kafka伺服器
準備3臺CentOS伺服器,並配置好靜態IP、主機名
伺服器名 | IP | 說明 |
---|---|---|
kafka01 | 192.168.88.51 | Kafka節點1 |
kafka02 | 192.168.88.52 | Kafka節點2 |
kafka03 | 192.168.88.53 | Kafka節點3 |
軟體版本說明
項 | 說明 |
---|---|
Linux Server | CentOS 7 |
Kafka | 2.3.0 |
2、ZooKeeper叢集
Kakfa叢集需要依賴ZooKeeper儲存Broker、Topic等資訊,這裡我們部署三臺ZK
伺服器名 | IP | 說明 |
---|---|---|
zk01 | 192.168.88.21 | ZooKeeper節點 |
zk02 | 192.168.88.22 | ZooKeeper節點 |
zk03 | 192.168.88.23 | ZooKeeper節點 |
部署過程參考:https://ken.io/note/zookeeper...
二、部署過程
1、應用&資料目錄
#建立應用目錄
mkdir /usr/kafka
#建立Kafka資料目錄
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka
2、下載&解壓
Kafka官方下載地址:https://kafka.apache.org/down...
這次我下載的是2.3.0版本
#建立並進入下載目錄
mkdir /home/downloads
cd /home/downloads
#下載安裝包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#解壓到應用目錄
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala編譯器的版本,2.3.0才是Kafka的版本
3、Kafka節點配置
#進入應用目錄
cd /usr/kafka/kafka_2.12-2.3.0/
#修改配置檔案
vi config/server.properties
通用配置
配置日誌目錄、指定ZooKeeper伺服器
# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs
# root directory for all kafka znodes.
zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181
分節點配置
- Kafka01
broker.id=0
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.51:9092
- Kafka02
broker.id=1
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.52:9092
- Kafka03
broker.id=2
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.53:9092
4、防火牆配置
#開放埠
firewall-cmd --add-port=9092/tcp --permanent
#重新載入防火牆配置
firewall-cmd --reload
5、啟動Kafka
#進入kafka根目錄
cd /usr/kafka/kafka_2.12-2.3.0/
#啟動
/bin/kafka-server-start.sh config/server.properties &
#啟動成功輸出示例(最後幾行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
三、Kafka測試
1、建立Topic
在kafka01(Broker)上建立測試Tpoic:test-ken-io,這裡我們指定了3個副本、1個分割槽
bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io
Topic在kafka01上建立後也會同步到叢集中另外兩個Broker:kafka02、kafka03
2、檢視Topic
我們可以通過命令列出指定Broker的
bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092
3、傳送訊息
這裡我們向Broker(id=0)的Topic=test-ken-io傳送訊息
bin/kafka-console-producer.sh --broker-list 192.168.88.51:9092 --topic test-ken-io
#訊息內容
> test by ken.io
4、消費訊息
在Kafka02上消費Broker03的訊息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning
在Kafka03上消費Broker02的訊息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
然後均能收到訊息
test by ken.io
這是因為這兩個消費訊息的命令是建立了兩個不同的Consumer
如果我們啟動Consumer指定Consumer Group Id就可以作為一個消費組協同工,1個訊息同時只會被一個Consumer消費到
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken
四、備註
1、Kafka常用配置項說明
Kafka常用Broker配置說明:
配置項 | 預設值/示例值 | 說明 |
---|---|---|
broker.id | 0 | Broker唯一標識 |
listeners | PLAINTEXT://192.168.88.53:9092 | 監聽資訊,PLAINTEXT表示明文傳輸 |
log.dirs | kafka/logs | kafka資料存放地址,可以填寫多個。用","間隔 |
message.max.bytes | message.max.bytes | 單個訊息長度限制,單位是位元組 |
num.partitions | 1 | 預設分割槽數 |
log.flush.interval.messages | Long.MaxValue | 在資料被寫入到硬碟和消費者可用前最大累積的訊息的數量 |
log.flush.interval.ms | Long.MaxValue | 在資料被寫入到硬碟前的最大時間 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查資料是否要寫入到硬碟的時間間隔。 |
log.retention.hours | 24 | 控制一個log保留時間,單位:小時 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper伺服器地址,多臺用","間隔 |
2、附錄
- https://kafka.apache.org/
- https://zh.wikipedia.org/zh-c...
本文首發於我的獨立部落格:https://ken.io/note/kafka-cluster-deploy-g