Docker實戰之Kafka叢集
阿新 • • 發佈:2020-03-04
# 1. 概述
Apache Kafka 是一個快速、可擴充套件的、高吞吐、可容錯的分散式釋出訂閱訊息系統。其具有高吞吐量、內建分割槽、支援資料副本和容錯的特性,適合在大規模訊息處理場景中使用。
筆者之前在物聯網公司工作,其中 Kafka 作為物聯網 MQ 選型的事實標準,這裡優先給大家搭建 Kafka 叢集環境。由於 Kafka 的安裝需要依賴 Zookeeper,對 Zookeeper 還不瞭解的小夥伴可以在 [這裡](https://mp.weixin.qq.com/s/aNpn59gHD_WOhtZkceMwug) 先認識下 Zookeeper。
Kafka 能解決什麼問題呢?先說一下訊息佇列常見的使用場景吧,其實場景有很多,但是比較核心的有 3 個:解耦、非同步、削峰。
# 2. Kafka 基本概念
Kafka 部分名詞解釋如下:
- Broker:訊息中介軟體處理結點,一個 Kafka 節點就是一個 broker,多個 broker 可以組成一個 Kafka 叢集。
- Topic:一類訊息,例如 page view 日誌、click 日誌等都可以以 topic 的形式存在,Kafka 叢集能夠同時負責多個 topic 的分發。
- Partition:topic 物理上的分組,一個 topic 可以分為多個 partition,每個 partition 是一個有序的佇列。
- Segment:partition 物理上由多個 segment 組成,下面有詳細說明。
- offset:每個 partition 都由一系列有序的、不可變的訊息組成,這些訊息被連續的追加到 partition 中。partition 中的每個訊息都有一個連續的序列號叫做 offset,用於 partition 唯一標識一條訊息.每個 partition 中的訊息都由 offset=0 開始記錄訊息。
# 3. Docker 環境搭建
配合上一節的 Zookeeper 環境,計劃搭建一個 3 節點的叢集。宿主機 IP 為 `192.168.124.5`。
**docker-compose-kafka-cluster.yml**
```yaml
version: '3.7'
networks:
docker_net:
external: true
services:
kafka1:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka1
ports:
- "9093:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP
KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主機對映port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093 ## 繫結釋出訂閱的埠。修改:宿主機IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka1/docker.sock:/var/run/docker.sock"
- "./kafka/kafka1/data/:/kafka"
networks:
- docker_net
kafka2:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka2
ports:
- "9094:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP
KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主機對映port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094 ## 修改:宿主機IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka2/docker.sock:/var/run/docker.sock"
- "./kafka/kafka2/data/:/kafka"
networks:
- docker_net
kafka3:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka3
ports:
- "9095:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP
KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主機對映port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095 ## 修改:宿主機IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka3/docker.sock:/var/run/docker.sock"
- "./kafka/kafka3/data/:/kafka"
networks:
- docker_net
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: unless-stopped
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9000:9000"
links: # 連線本compose檔案建立的container
- kafka1
- kafka2
- kafka3
external_links: # 連線本compose檔案以外的container
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 ## 修改:宿主機IP
TZ: CST-8
networks:
- docker_net
```
執行以下命令啟動
```bash
docker-compose -f docker-compose-kafka-cluster.yml up -d
```
可以看到 kafka 叢集已經啟動成功。
# 4. Kafka 初認識
## 4.1 視覺化管理
細心的小夥伴發現上邊的配置除了 kafka 外還有一個 kafka-manager 模組。它是 kafka 的視覺化管理模組。因為 kafka 的元資料、配置資訊由 Zookeeper 管理,這裡我們在 UI 頁面做下相關配置。
_1._ 訪問 [http:localhost:9000](http:localhost:9000),按圖示新增相關配置
![](https://gitee.com/idea360/oss/raw/master/images/kafka-manage-config-cluster.png)
_2._ 配置後我們可以看到預設有一個 topic(\_\_consumer_offsets),3 個 brokers。該 topic 分 50 個 partition,用於記錄 kafka 的消費偏移量。
![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-default-topic.png)
## 4.2 Zookeeper 在 kafka 環境中做了什麼
_1._ 首先觀察下根目錄
kafka 基於 zookeeper,kafka 啟動會將元資料儲存在 zookeeper 中。檢視 zookeeper 節點目錄,會發現多了很多和 kafka 相關的目錄。結果如下:
```docker
➜ docker zkCli -server 127.0.0.1:2183
Connecting to 127.0.0.1:2183
Welcome to ZooKeeper!
JLine support is enabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2183(CONNECTED) 0] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config]
```
_2._ 檢視我們對映的 kafka 目錄,新版本的 kafka 偏移量不再儲存在 zk 中,而是在 kafka 自己的環境中。
我們節選了部分目錄(包含 2 個 partition)
```text
├── kafka1
│ ├── data
│ │ └── kafka-logs-c4e2e9edc235
│ │ ├── __consumer_offsets-1
│ │ │ ├── 00000000000000000000.index // segment索引檔案
│ │ │ ├── 00000000000000000000.log // 資料檔案
│ │ │ ├── 00000000000000000000.timeindex // 訊息時間戳索引檔案
│ │ │ └── leader-epoch-checkpoint
...
│ │ ├── __consumer_offsets-7
│ │ │ ├── 00000000000000000000.index
│ │ │ ├── 00000000000000000000.log
│ │ │ ├── 00000000000000000000.timeindex
│ │ │ └── leader-epoch-checkpoint
│ │ ├── cleaner-offset-checkpoint
│ │ ├── log-start-offset-checkpoint
│ │ ├── meta.properties
│ │ ├── recovery-point-offset-checkpoint
│ │ └── replication-offset-checkpoint
│ └── docker.sock
```
結果與 Kafka-Manage 顯示結果一致。圖示的檔案是一個 Segment,00000000000000000000.log 表示 offset 從 0 開始,隨著資料不斷的增加,會有多個 Segment 檔案。
# 5. 生產與消費
## 5.1 建立主題
```bash
➜ docker docker exec -it kafka1 /bin/bash # 進入容器
bash-4.4# cd /opt/kafka/ # 進入安裝目錄
bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 檢視主題列表
__consumer_offsets
bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test # 新建主題
Created topic test.
```
> 說明:
> --replication-factor 副本數;
> --partitions 分割槽數;
> replication<=broker(一定);
> 有效消費者數<=partitions 分割槽數(一定);
新建主題後, 再次檢視對映目錄, 由圖可見,partition 在 3 個 broker 上均勻分佈。
![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-topic-test-partition-show.png)
## 5.2 生產訊息
```bash
bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test
>msg1
>msg2
>msg3
>msg4
>msg5
>msg6
```
## 5.3 消費訊息
```bash
bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
msg1
msg3
msg2
msg4
msg6
msg5
```
> --from-beginning 代表從頭開始消費
## 5.4 消費詳情
_檢視消費者組_
```bash
bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list
KafkaManagerOffsetCache
console-consumer-86137
```
_消費組偏移量_
```bash
bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --group KafkaManagerOffsetCache
```
_檢視 topic 詳情_
```bash
bash-4.4# ./bin/kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --describe --topic test
Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
```
_檢視.log 資料檔案_
```bash
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log --print-data-log
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1583317546421 size: 72 magic: 2 compresscodec: NONE crc: 1454276831 isvalid: true
| offset: 0 CreateTime: 1583317546421 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg2
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 72 CreateTime: 1583317550369 size: 72 magic: 2 compresscodec: NONE crc: 3578672322 isvalid: true
| offset: 1 CreateTime: 1583317550369 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg4
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 144 CreateTime: 1583317554831 size: 72 magic: 2 compresscodec: NONE crc: 2727139808 isvalid: true
| offset: 2 CreateTime: 1583317554831 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg6
```
> 這裡需要看下自己的檔案路徑是什麼,別直接 copy 我的哦
_檢視.index 索引檔案_
```bash
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index
offset: 0 position: 0
```
_檢視.timeindex 索引檔案_
```bash
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex --verify-index-only
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex
Found timestamp mismatch in :/kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1583317546421
```
# 6. SpringBoot 整合
筆者 SpringBoot 版本是 `2.2.2.RELEASE`
pom.xml 新增依賴
```xml
```
生產者配置
```java
@Configuration
public class KafkaProducerConfig {
/**
* producer配置
* @return
*/
pu