1. 程式人生 > >Docker實戰之Kafka叢集

Docker實戰之Kafka叢集

# 1. 概述 Apache Kafka 是一個快速、可擴充套件的、高吞吐、可容錯的分散式釋出訂閱訊息系統。其具有高吞吐量、內建分割槽、支援資料副本和容錯的特性,適合在大規模訊息處理場景中使用。 筆者之前在物聯網公司工作,其中 Kafka 作為物聯網 MQ 選型的事實標準,這裡優先給大家搭建 Kafka 叢集環境。由於 Kafka 的安裝需要依賴 Zookeeper,對 Zookeeper 還不瞭解的小夥伴可以在 [這裡](https://mp.weixin.qq.com/s/aNpn59gHD_WOhtZkceMwug) 先認識下 Zookeeper。 Kafka 能解決什麼問題呢?先說一下訊息佇列常見的使用場景吧,其實場景有很多,但是比較核心的有 3 個:解耦、非同步、削峰。 # 2. Kafka 基本概念 Kafka 部分名詞解釋如下: - Broker:訊息中介軟體處理結點,一個 Kafka 節點就是一個 broker,多個 broker 可以組成一個 Kafka 叢集。 - Topic:一類訊息,例如 page view 日誌、click 日誌等都可以以 topic 的形式存在,Kafka 叢集能夠同時負責多個 topic 的分發。 - Partition:topic 物理上的分組,一個 topic 可以分為多個 partition,每個 partition 是一個有序的佇列。 - Segment:partition 物理上由多個 segment 組成,下面有詳細說明。 - offset:每個 partition 都由一系列有序的、不可變的訊息組成,這些訊息被連續的追加到 partition 中。partition 中的每個訊息都有一個連續的序列號叫做 offset,用於 partition 唯一標識一條訊息.每個 partition 中的訊息都由 offset=0 開始記錄訊息。 # 3. Docker 環境搭建 配合上一節的 Zookeeper 環境,計劃搭建一個 3 節點的叢集。宿主機 IP 為 `192.168.124.5`。 **docker-compose-kafka-cluster.yml** ```yaml version: '3.7' networks: docker_net: external: true services: kafka1: image: wurstmeister/kafka restart: unless-stopped container_name: kafka1 ports: - "9093:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主機對映port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093 ## 繫結釋出訂閱的埠。修改:宿主機IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka1/docker.sock:/var/run/docker.sock" - "./kafka/kafka1/data/:/kafka" networks: - docker_net kafka2: image: wurstmeister/kafka restart: unless-stopped container_name: kafka2 ports: - "9094:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 2 KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主機對映port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094 ## 修改:宿主機IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka2/docker.sock:/var/run/docker.sock" - "./kafka/kafka2/data/:/kafka" networks: - docker_net kafka3: image: wurstmeister/kafka restart: unless-stopped container_name: kafka3 ports: - "9095:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 3 KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主機IP KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主機對映port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095 ## 修改:宿主機IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka3/docker.sock:/var/run/docker.sock" - "./kafka/kafka3/data/:/kafka" networks: - docker_net kafka-manager: image: sheepkiller/kafka-manager:latest restart: unless-stopped container_name: kafka-manager hostname: kafka-manager ports: - "9000:9000" links: # 連線本compose檔案建立的container - kafka1 - kafka2 - kafka3 external_links: # 連線本compose檔案以外的container - zoo1 - zoo2 - zoo3 environment: ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 ## 修改:宿主機IP TZ: CST-8 networks: - docker_net ``` 執行以下命令啟動 ```bash docker-compose -f docker-compose-kafka-cluster.yml up -d ``` 可以看到 kafka 叢集已經啟動成功。 # 4. Kafka 初認識 ## 4.1 視覺化管理 細心的小夥伴發現上邊的配置除了 kafka 外還有一個 kafka-manager 模組。它是 kafka 的視覺化管理模組。因為 kafka 的元資料、配置資訊由 Zookeeper 管理,這裡我們在 UI 頁面做下相關配置。 _1._ 訪問 [http:localhost:9000](http:localhost:9000),按圖示新增相關配置 ![](https://gitee.com/idea360/oss/raw/master/images/kafka-manage-config-cluster.png) _2._ 配置後我們可以看到預設有一個 topic(\_\_consumer_offsets),3 個 brokers。該 topic 分 50 個 partition,用於記錄 kafka 的消費偏移量。 ![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-default-topic.png) ## 4.2 Zookeeper 在 kafka 環境中做了什麼 _1._ 首先觀察下根目錄 kafka 基於 zookeeper,kafka 啟動會將元資料儲存在 zookeeper 中。檢視 zookeeper 節點目錄,會發現多了很多和 kafka 相關的目錄。結果如下: ```docker ➜ docker zkCli -server 127.0.0.1:2183 Connecting to 127.0.0.1:2183 Welcome to ZooKeeper! JLine support is enabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: 127.0.0.1:2183(CONNECTED) 0] ls / [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config] ``` _2._ 檢視我們對映的 kafka 目錄,新版本的 kafka 偏移量不再儲存在 zk 中,而是在 kafka 自己的環境中。 我們節選了部分目錄(包含 2 個 partition) ```text ├── kafka1 │   ├── data │   │   └── kafka-logs-c4e2e9edc235 │   │   ├── __consumer_offsets-1 │   │   │   ├── 00000000000000000000.index // segment索引檔案 │   │   │   ├── 00000000000000000000.log // 資料檔案 │   │   │   ├── 00000000000000000000.timeindex // 訊息時間戳索引檔案 │   │   │   └── leader-epoch-checkpoint ... │   │   ├── __consumer_offsets-7 │   │   │   ├── 00000000000000000000.index │   │   │   ├── 00000000000000000000.log │   │   │   ├── 00000000000000000000.timeindex │   │   │   └── leader-epoch-checkpoint │   │   ├── cleaner-offset-checkpoint │   │   ├── log-start-offset-checkpoint │   │   ├── meta.properties │   │   ├── recovery-point-offset-checkpoint │   │   └── replication-offset-checkpoint │   └── docker.sock ``` 結果與 Kafka-Manage 顯示結果一致。圖示的檔案是一個 Segment,00000000000000000000.log 表示 offset 從 0 開始,隨著資料不斷的增加,會有多個 Segment 檔案。 # 5. 生產與消費 ## 5.1 建立主題 ```bash ➜ docker docker exec -it kafka1 /bin/bash # 進入容器 bash-4.4# cd /opt/kafka/ # 進入安裝目錄 bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 檢視主題列表 __consumer_offsets bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test # 新建主題 Created topic test. ``` > 說明: > --replication-factor 副本數; > --partitions 分割槽數; > replication<=broker(一定); > 有效消費者數<=partitions 分割槽數(一定); 新建主題後, 再次檢視對映目錄, 由圖可見,partition 在 3 個 broker 上均勻分佈。 ![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-topic-test-partition-show.png) ## 5.2 生產訊息 ```bash bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test >msg1 >msg2 >msg3 >msg4 >msg5 >msg6 ``` ## 5.3 消費訊息 ```bash bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning msg1 msg3 msg2 msg4 msg6 msg5 ``` > --from-beginning 代表從頭開始消費 ## 5.4 消費詳情 _檢視消費者組_ ```bash bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list KafkaManagerOffsetCache console-consumer-86137 ``` _消費組偏移量_ ```bash bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --group KafkaManagerOffsetCache ``` _檢視 topic 詳情_ ```bash bash-4.4# ./bin/kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --describe --topic test Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 ``` _檢視.log 資料檔案_ ```bash bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log --print-data-log Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1583317546421 size: 72 magic: 2 compresscodec: NONE crc: 1454276831 isvalid: true | offset: 0 CreateTime: 1583317546421 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg2 baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 72 CreateTime: 1583317550369 size: 72 magic: 2 compresscodec: NONE crc: 3578672322 isvalid: true | offset: 1 CreateTime: 1583317550369 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg4 baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 144 CreateTime: 1583317554831 size: 72 magic: 2 compresscodec: NONE crc: 2727139808 isvalid: true | offset: 2 CreateTime: 1583317554831 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg6 ``` > 這裡需要看下自己的檔案路徑是什麼,別直接 copy 我的哦 _檢視.index 索引檔案_ ```bash bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index offset: 0 position: 0 ``` _檢視.timeindex 索引檔案_ ```bash bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex --verify-index-only Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex Found timestamp mismatch in :/kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex Index timestamp: 0, log timestamp: 1583317546421 ``` # 6. SpringBoot 整合 筆者 SpringBoot 版本是 `2.2.2.RELEASE` pom.xml 新增依賴 ```xml ``` 生產者配置 ```java @Configuration public class KafkaProducerConfig { /** * producer配置 * @return */ pu