1. 程式人生 > >一站式Kafka平臺解決方案——KafkaCenter

一站式Kafka平臺解決方案——KafkaCenter

![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090013783-104857051.png) ## KafkaCenter是什麼 KafkaCenter是一個針對Kafka的一站式,解決方案。用於Kafka叢集的維護與管理,生產者和消費者的監控,以及Kafka部分生態元件的使用。 對於Kafka的平臺化,一直缺少一個成熟的解決方案,之前比較流行的kafka監控方案,如kafka-manager提供了叢集管理與topic管理等等功能。但是對於生產者、消費者的監控,以及Kafka的新生態,如Connect,KSQL還缺少響應的支援。Confluent Control Center功能要完整一些,但卻是非開源收費的。 對於Kafka的使用,一直都是一個讓人頭疼的問題,由於實時系統的強運維特性,我們不得不投入大量的時間用於叢集的維護,kafka的運維,比如: - 人工建立topic,特別費力 - 相關kafka運維,監控孤島化 - 現有消費監控工具監控不準確 - 無法拿到Kafka 叢集的summay資訊 - 無法快速知曉叢集健康狀態 - 無法知曉業務對team kafka使用情況 - kafka管理,監控工具稀少,沒有一個好的工具我們直接可以使用 - 無法快速查詢topic訊息 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090032946-1806846818.png) 功能模組介紹 - Home-> 檢視平臺管理的Kafka Cluster叢集資訊及監控資訊 - Topic-> 使用者可以在此模組檢視自己的Topic,發起申請新建Topic,同時可以對Topic進行生產消費測試。 - Monitor-> 使用者可以在此模組中可以檢視Topic的生產以及消費情況,同時可以針對消費延遲情況設定預警資訊。 - Connect-> 實現使用者快速建立自己的Connect Job,並對自己的Connect進行維護。 - KSQL-> 實現使用者快速建立自己的KSQL Job,並對自己的Job進行維護。 - Approve-> 此模組主要用於當普通使用者申請建立Topic,管理員進行審批操作。 - Setting-> 此模組主要功能為管理員維護User、Team以及kafka cluster資訊 - Kafka Manager-> 此模組用於管理員對叢集的正常維護操作。 系統截圖: ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090042645-1315686035.png) ## 安裝與入門 安裝需要依賴 mysql es email server | 元件 | 是否必須 | 功能 | | ------------------- | -------- | ------------------------------------- | | mysql | 必須 | 配置資訊存在mysql | | elasticsearch(7.0+) | 可選 | 各種監控資訊的儲存 | | email server | 可選 | Apply, approval, warning e-mail alert | #### 1、初始化 在MySQL中執行sql建表 ``` -- Dumping database structure for kafka_center CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */; USE `kafka_center`; -- Dumping structure for table kafka_center.alert_group CREATE TABLE IF NOT EXISTS `alert_group` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `threshold` int(11) DEFAULT NULL, `dispause` int(11) DEFAULT NULL, `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_date` datetime DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `disable_alerta` tinyint(1) DEFAULT 0, `enable` tinyint(1) NOT NULL DEFAULT 1, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.cluster_info CREATE TABLE IF NOT EXISTS `cluster_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL, `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `enable` int(11) DEFAULT NULL, `broker_size` int(4) DEFAULT 0, `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '', `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.ksql_info CREATE TABLE IF NOT EXISTS `ksql_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) DEFAULT NULL, `cluster_name` varchar(255) DEFAULT NULL, `ksql_url` varchar(255) DEFAULT NULL, `ksql_serverId` varchar(255) DEFAULT NULL, `version` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Data exporting was unselected. -- Dumping structure for table kafka_center.task_info CREATE TABLE IF NOT EXISTS `task_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '', `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `partition` int(11) DEFAULT NULL, `replication` int(11) DEFAULT NULL, `message_rate` int(50) DEFAULT NULL, `ttl` int(11) DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, `approved` int(11) DEFAULT NULL, `approved_id` int(11) DEFAULT NULL, `approved_time` datetime DEFAULT NULL, `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.team_info CREATE TABLE IF NOT EXISTS `team_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.topic_collection CREATE TABLE IF NOT EXISTS `topic_collection` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `user_id` int(11) NOT NULL, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.topic_info CREATE TABLE IF NOT EXISTS `topic_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `partition` int(11) DEFAULT NULL, `replication` int(11) DEFAULT NULL, `ttl` bigint(11) DEFAULT NULL, `config` varchar(512) COLLATE utf8_bin DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.user_info CREATE TABLE IF NOT EXISTS `user_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `real_name` varchar(255) COLLATE utf8_bin DEFAULT '', `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100', `create_time` datetime DEFAULT NULL, `password` varchar(255) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.user_team CREATE TABLE IF NOT EXISTS `user_team` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; ``` #### 2、配置 相關配置位於application.properties 可對埠 日誌等資訊做一些修改 ``` server.port=8080 debug=false # 設定session timeout為6小時 server.servlet.session.timeout=21600 spring.security.user.name=admin spring.security.user.password=admin spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8 spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=5 spring.datasource.hikari.maximum-pool-size=15 spring.datasource.hikari.pool-name=KafkaCenterHikariCP spring.datasource.hikari.max-lifetime =30000 spring.datasource.hikari.connection-test-query=SELECT 1 management.health.defaults.enabled=false public.url=http://localhost:8080 connect.url=http://localhost:8000/#/ system.topic.ttl.h=16 monitor.enable=true monitor.collect.period.minutes=5 monitor.elasticsearch.hosts=localhost:9200 monitor.elasticsearch.index=kafka_center_monitor #是否啟用收集執行緒指定叢集收集 monitor.collector.include.enable=false #收集執行緒指定location,必須屬於remote.locations之中 monitor.collector.include.location=dev collect.topic.enable=true collect.topic.period.minutes=10 # remote的功能是為了提高lag查詢和收集,解決跨location網路延遲問題 remote.query.enable=false remote.hosts=gqc@localhost2:8080 remote.locations=dev,gqc #傳送consumer group的lag傳送給alert service alert.enable=false alert.dispause=2 alert.service= alert.threshold=1000 alter.env=other #是否開啟郵件功能,true:啟用,false:禁用 mail.enable=false spring.mail.host= [email protected] # oauth2 generic.enabled=false generic.name=oauth2 Login generic.auth_url= generic.token_url= generic.redirect_utl= generic.api_url= generic.client_id= generic.client_secret= generic.scopes= ``` #### 3、執行 推薦使用docker ``` docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0 ``` 不用docker ``` $ git clone https://github.com/xaecbd/KafkaCenter.git $ cd KafkaCenter $ mvn clean package -Dmaven.test.skip=true $ cd KafkaCenter\KafkaCenter-Core\target $ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar ``` #### 4、檢視 訪問http://localhost:8080 管理員使用者與密碼預設:**admin / admin** ## 功能介紹 **Topics** 使用者可以在此模組完成Topic檢視,已經申請新建Topic,同時可以對Topic進行生產消費測試。 **Monitor** 使用者可以在此模組中可以檢視Topic的生成以及消費情況,同時可以針對消費延遲情況設定預警資訊。 **Alerts** 此模組用於維護預警資訊。使用者可以看到自己所有預警資訊,管理員可以看到所有人的預警資訊。 Kafka Connect 實現使用者快速建立自己的Connect Job,並對自己的Connect進行維護。 **KSQL** 實現使用者快速建立自己的KSQL Job,並對自己的Job進行維護。 **Approve** 此模組主要用於當普通使用者申請建立Topic 或者Job時,管理員進行審批操作。 **Setting** 此模組主要功能為管理員維護User、Team以及kafka cluster資訊 **Cluster Manager** 此模組用於管理員對叢集的正常維護操作。 ##### Home 這裡是一些基本的統計資訊 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090114677-13444865.png) ##### My Favorite 叢集與topic列表 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090121142-546415751.png) ##### Topic 這裡是一些topic的管理功能 ###### Topic List **操作範圍:** 使用者所屬Team的所有Topic - Topic -> Topic List -> Detail 檢視Topic的詳細資訊 - Topic -> Topic List -> Mock 對Topic進行**生產**測試 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090130993-438219902.png) ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090137967-1507290944.png) ###### 申請建立topic **Important**: admin不能申請task,普通使用者必須先讓管理員新建team後,將使用者加入指定team後,才可以申請task。 **操作範圍:** 使用者所屬Team的所有Task - Topic -> My Task -> Detail 檢視申請的Task資訊 - Topic -> My Task -> Delete 刪除被拒絕或待審批的Task - Topic -> My Task -> Edit 修改被拒絕的Task - Topic -> My Task -> Create Topic Task 建立Task - 按照表單各欄位要求填寫資訊 - 點選確認,提交申請 **審批結果:** - 審批通過:Topic將會被建立在管理員指定的叢集 - 審批拒絕:使用者收到郵件,返回到My Task,點選對應Task後面的Edit,針對審批意見進行修改 **Topic命名規則:** **只能**包含:數字、大小寫字母、下劃線、中劃線、點;長度大於等於3小於等於100。 **不推薦**:下劃線開頭; ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090145896-602760825.png) 可對所有Topic進行**消費**測試 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090152134-1388734633.png) ##### Monitor 監控模組 生產者監控 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090201079-1632539030.png) 消費者監控 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090206981-454763569.png) 訊息積壓 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090212860-1952482215.png) 報警功能 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090218854-1746820144.png) ##### Connect 這裡是一些Connect的操作 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090228023-1028610164.png) ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090230821-1567267216.png) ##### KSQL 可以進行KQL的查詢操作 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090246344-227530514.png) ##### Approve 這裡主要是管理員做一些稽核操作 - Approve->check 審批使用者的Task - 根據使用者選擇的location指定cluster - 檢查使用者設定的partition和replication大小是否合理,如不合理做出調整 - 檢查其他欄位是否合理,如需要拒絕該申請,點選Reject並填寫意見。 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090252767-397993504.png) ##### Kafka Manager ###### Topic管理 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090301708-1623616418.png) ###### Cluster管理 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090311204-146396598.png) ###### broker管理 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090317758-834270680.png) ###### group管理 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090326657-1335005118.png) ##### Setting 這些主要是使用者的一些設定 ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200518090343130-1124180689.png) KafkaCenter還是一個非常不錯的kafka管理工具,可以滿足大部分需求。 更多實時資料分析相關博文與科技資訊,歡迎關注 “實時流式計算” ![](https://img2020.cnblogs.com/blog/1089984/202005/1089984-20200511083216576-14373893