一站式Kafka平臺解決方案——KafkaCenter
阿新 • • 發佈:2020-05-18
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090013783-104857051.png)
## KafkaCenter是什麼
KafkaCenter是一個針對Kafka的一站式,解決方案。用於Kafka叢集的維護與管理,生產者和消費者的監控,以及Kafka部分生態元件的使用。
對於Kafka的平臺化,一直缺少一個成熟的解決方案,之前比較流行的kafka監控方案,如kafka-manager提供了叢集管理與topic管理等等功能。但是對於生產者、消費者的監控,以及Kafka的新生態,如Connect,KSQL還缺少響應的支援。Confluent Control Center功能要完整一些,但卻是非開源收費的。
對於Kafka的使用,一直都是一個讓人頭疼的問題,由於實時系統的強運維特性,我們不得不投入大量的時間用於叢集的維護,kafka的運維,比如:
- 人工建立topic,特別費力
- 相關kafka運維,監控孤島化
- 現有消費監控工具監控不準確
- 無法拿到Kafka 叢集的summay資訊
- 無法快速知曉叢集健康狀態
- 無法知曉業務對team kafka使用情況
- kafka管理,監控工具稀少,沒有一個好的工具我們直接可以使用
- 無法快速查詢topic訊息
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090032946-1806846818.png)
功能模組介紹
- Home-> 檢視平臺管理的Kafka Cluster叢集資訊及監控資訊
- Topic-> 使用者可以在此模組檢視自己的Topic,發起申請新建Topic,同時可以對Topic進行生產消費測試。
- Monitor-> 使用者可以在此模組中可以檢視Topic的生產以及消費情況,同時可以針對消費延遲情況設定預警資訊。
- Connect-> 實現使用者快速建立自己的Connect Job,並對自己的Connect進行維護。
- KSQL-> 實現使用者快速建立自己的KSQL Job,並對自己的Job進行維護。
- Approve-> 此模組主要用於當普通使用者申請建立Topic,管理員進行審批操作。
- Setting-> 此模組主要功能為管理員維護User、Team以及kafka cluster資訊
- Kafka Manager-> 此模組用於管理員對叢集的正常維護操作。
系統截圖:
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090042645-1315686035.png)
## 安裝與入門
安裝需要依賴 mysql es email server
| 元件 | 是否必須 | 功能 |
| ------------------- | -------- | ------------------------------------- |
| mysql | 必須 | 配置資訊存在mysql |
| elasticsearch(7.0+) | 可選 | 各種監控資訊的儲存 |
| email server | 可選 | Apply, approval, warning e-mail alert |
#### 1、初始化
在MySQL中執行sql建表
```
-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
USE `kafka_center`;
-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) NOT NULL,
`topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`threshold` int(11) DEFAULT NULL,
`dispause` int(11) DEFAULT NULL,
`mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
`webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
`create_date` datetime DEFAULT NULL,
`owner_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`disable_alerta` tinyint(1) DEFAULT 0,
`enable` tinyint(1) NOT NULL DEFAULT 1,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_bin NOT NULL,
`zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`create_time` datetime DEFAULT NULL,
`comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`enable` int(11) DEFAULT NULL,
`broker_size` int(4) DEFAULT 0,
`kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '',
`location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) DEFAULT NULL,
`cluster_name` varchar(255) DEFAULT NULL,
`ksql_url` varchar(255) DEFAULT NULL,
`ksql_serverId` varchar(255) DEFAULT NULL,
`version` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '',
`topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`partition` int(11) DEFAULT NULL,
`replication` int(11) DEFAULT NULL,
`message_rate` int(50) DEFAULT NULL,
`ttl` int(11) DEFAULT NULL,
`owner_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
`create_time` datetime DEFAULT NULL,
`approved` int(11) DEFAULT NULL,
`approved_id` int(11) DEFAULT NULL,
`approved_time` datetime DEFAULT NULL,
`approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) NOT NULL,
`user_id` int(11) NOT NULL,
`name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cluster_id` int(11) NOT NULL,
`topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`partition` int(11) DEFAULT NULL,
`replication` int(11) DEFAULT NULL,
`ttl` bigint(11) DEFAULT NULL,
`config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
`owner_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
`comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`real_name` varchar(255) COLLATE utf8_bin DEFAULT '',
`email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '',
`role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100',
`create_time` datetime DEFAULT NULL,
`password` varchar(255) COLLATE utf8_bin DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- Data exporting was unselected.
-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`team_id` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
```
#### 2、配置
相關配置位於application.properties
可對埠 日誌等資訊做一些修改
```
server.port=8080
debug=false
# 設定session timeout為6小時
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false
public.url=http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16
monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否啟用收集執行緒指定叢集收集
monitor.collector.include.enable=false
#收集執行緒指定location,必須屬於remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是為了提高lag查詢和收集,解決跨location網路延遲問題
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#傳送consumer group的lag傳送給alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否開啟郵件功能,true:啟用,false:禁用
mail.enable=false
spring.mail.host=
[email protected]
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=
```
#### 3、執行
推薦使用docker
```
docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0
```
不用docker
```
$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenter\KafkaCenter-Core\target
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar
```
#### 4、檢視
訪問http://localhost:8080 管理員使用者與密碼預設:**admin / admin**
## 功能介紹
**Topics**
使用者可以在此模組完成Topic檢視,已經申請新建Topic,同時可以對Topic進行生產消費測試。
**Monitor**
使用者可以在此模組中可以檢視Topic的生成以及消費情況,同時可以針對消費延遲情況設定預警資訊。
**Alerts**
此模組用於維護預警資訊。使用者可以看到自己所有預警資訊,管理員可以看到所有人的預警資訊。
Kafka Connect
實現使用者快速建立自己的Connect Job,並對自己的Connect進行維護。
**KSQL**
實現使用者快速建立自己的KSQL Job,並對自己的Job進行維護。
**Approve**
此模組主要用於當普通使用者申請建立Topic 或者Job時,管理員進行審批操作。
**Setting**
此模組主要功能為管理員維護User、Team以及kafka cluster資訊
**Cluster Manager**
此模組用於管理員對叢集的正常維護操作。
##### Home
這裡是一些基本的統計資訊
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090114677-13444865.png)
##### My Favorite
叢集與topic列表
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090121142-546415751.png)
##### Topic
這裡是一些topic的管理功能
###### Topic List
**操作範圍:**
使用者所屬Team的所有Topic
- Topic -> Topic List -> Detail 檢視Topic的詳細資訊
- Topic -> Topic List -> Mock 對Topic進行**生產**測試
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090130993-438219902.png)
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090137967-1507290944.png)
###### 申請建立topic
**Important**: admin不能申請task,普通使用者必須先讓管理員新建team後,將使用者加入指定team後,才可以申請task。
**操作範圍:**
使用者所屬Team的所有Task
- Topic -> My Task -> Detail 檢視申請的Task資訊
- Topic -> My Task -> Delete 刪除被拒絕或待審批的Task
- Topic -> My Task -> Edit 修改被拒絕的Task
- Topic -> My Task -> Create Topic Task 建立Task
- 按照表單各欄位要求填寫資訊
- 點選確認,提交申請
**審批結果:**
- 審批通過:Topic將會被建立在管理員指定的叢集
- 審批拒絕:使用者收到郵件,返回到My Task,點選對應Task後面的Edit,針對審批意見進行修改
**Topic命名規則:**
**只能**包含:數字、大小寫字母、下劃線、中劃線、點;長度大於等於3小於等於100。
**不推薦**:下劃線開頭;
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090145896-602760825.png)
可對所有Topic進行**消費**測試
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090152134-1388734633.png)
##### Monitor
監控模組
生產者監控
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090201079-1632539030.png)
消費者監控
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090206981-454763569.png)
訊息積壓
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090212860-1952482215.png)
報警功能
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090218854-1746820144.png)
##### Connect
這裡是一些Connect的操作
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090228023-1028610164.png)
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090230821-1567267216.png)
##### KSQL
可以進行KQL的查詢操作
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090246344-227530514.png)
##### Approve
這裡主要是管理員做一些稽核操作
- Approve->check 審批使用者的Task
- 根據使用者選擇的location指定cluster
- 檢查使用者設定的partition和replication大小是否合理,如不合理做出調整
- 檢查其他欄位是否合理,如需要拒絕該申請,點選Reject並填寫意見。
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090252767-397993504.png)
##### Kafka Manager
###### Topic管理
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090301708-1623616418.png)
###### Cluster管理
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090311204-146396598.png)
###### broker管理
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090317758-834270680.png)
###### group管理
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090326657-1335005118.png)
##### Setting
這些主要是使用者的一些設定
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090343130-1124180689.png)
KafkaCenter還是一個非常不錯的kafka管理工具,可以滿足大部分需求。
更多實時資料分析相關博文與科技資訊,歡迎關注 “實時流式計算”
![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200511083216576-14373893