1. 程式人生 > 其它 >【kafka運維】Kafka全網最全最詳細運維命令合集(精品強烈建議收藏!!!)

【kafka運維】Kafka全網最全最詳細運維命令合集(精品強烈建議收藏!!!)

kafka 運維的福音

本文所有命令,博主均全部操作驗證過,保證準確性; 非複製貼上拼湊文章; 如果想了解更多工具命令,可在評論區留下評論,博主會擇期加上;

博主正在連載 Kafka原始碼、Kafka運維、Kafka實踐系列文章 並且相關文章會配套錄製視訊
本文為專欄第一篇歡迎關注<石臻臻的雜貨鋪>不迷路!!!

以下大部分運維操作,都可以使用 LogI-Kafka-Manager 在平臺上視覺化操作;

@

目錄

1.TopicCommand

1.1.Topic建立

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test


相關可選引數

引數 描述 例子
--bootstrap-server 指定kafka服務 指定連線到的kafka服務; 如果有這個引數,則 --zookeeper可以不需要 --bootstrap-server localhost:9092
--zookeeper 棄用, 通過zk的連線方式連線到kafka叢集; --zookeeper localhost:2181 或者localhost:2181/kafka
--replication-factor 副本數量,注意不能大於broker數量;如果不提供,則會用叢集中預設配置 --replication-factor 3
--partitions 分割槽數量,當建立或者修改topic的時候,用這個來指定分割槽數;如果建立的時候沒有提供引數,則用叢集中預設值; 注意如果是修改的時候,分割槽比之前小會有問題 --partitions 3
--replica-assignment 副本分割槽分配方式;建立topic的時候可以自己指定副本分配情況; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分割槽和三個副本,對應分配的Broker; 逗號隔開標識分割槽;冒號隔開表示副本
--config <String: name=value> 用來設定topic級別的配置以覆蓋預設配置;只在--create 和--bootstrap-server 同時使用時候生效; 可以配置的引數列表請看文末附件 例如覆蓋兩個配置 --config retention.bytes=123455 --config retention.ms=600001
--command-config <String: command 檔案路徑> 用來配置客戶端Admin Client啟動配置,只在--bootstrap-server 同時使用時候生效; 例如:設定請求的超時時間 --command-config config/producer.proterties ; 然後在檔案中配置 request.timeout.ms=300000

1.2.刪除Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test


支援正則表示式匹配Topic來進行刪除,只需要將topic 用雙引號包裹起來
例如: 刪除以create_topic_byhand_zk為開頭的topic;

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*"
.表示任意匹配除換行符 \n 之外的任何單字元。要匹配 . ,請使用 . 。
·*·:匹配前面的子表示式零次或多次。要匹配 * 字元,請使用 *。
.* : 任意字元

刪除任意Topic (慎用)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?"

更多的用法請參考正則表示式

1.3.Topic分割槽擴容

zk方式(不推薦)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

kafka版本 >= 2.2 支援下面方式(推薦)

單個Topic擴容

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

批量擴容 (將所有正則表示式匹配到的Topic分割槽擴容到4個)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

".*?" 正則表示式的意思是匹配所有; 您可按需匹配

PS: 當某個Topic的分割槽少於指定的分割槽數時候,他會丟擲異常;但是不會影響其他Topic正常進行;


相關可選引數

引數 描述 例子
--replica-assignment 副本分割槽分配方式;建立topic的時候可以自己指定副本分配情況; --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分割槽和三個副本,對應分配的Broker; 逗號隔開標識分割槽;冒號隔開表示副本

PS: 雖然這裡配置的是全部的分割槽副本分配配置,但是正在生效的是新增的分割槽;
比如: 以前3分割槽1副本是這樣的

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2

現在新增一個分割槽,--replica-assignment 2,1,3,4 ; 看這個意思好像是把0,1號分割槽互相換個Broker

Broker-1 Broker-2 Broker-3 Broker-4
1 0 2 3

但是實際上不會這樣做,Controller在處理的時候會把前面3個截掉; 只取新增的分割槽分配方式,原來的還是不會變

Broker-1 Broker-2 Broker-3 Broker-4
0 1 2 3

1.4.查詢Topic描述

1.查詢單個Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2.批量查詢Topic(正則表示式匹配,下面是查詢所有Topic)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

支援正則表示式匹配Topic,只需要將topic 用雙引號包裹起來


相關可選引數

引數 描述 例子
--bootstrap-server 指定kafka服務 指定連線到的kafka服務; 如果有這個引數,則 --zookeeper可以不需要 --bootstrap-server localhost:9092
--at-min-isr-partitions 查詢的時候省略一些計數和配置資訊 --at-min-isr-partitions
--exclude-internal 排除kafka內部topic,比如__consumer_offsets-* --exclude-internal
--topics-with-overrides 僅顯示已覆蓋配置的主題,也就是單獨針對Topic設定的配置覆蓋預設配置;不展示分割槽資訊 --topics-with-overrides

5.查詢Topic列表

1.查詢所有Topic列表

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2.查詢匹配Topic列表(正則表示式)

查詢test_create_開頭的所有Topic列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"


相關可選引數

引數 描述 例子
--exclude-internal 排除kafka內部topic,比如__consumer_offsets-* --exclude-internal
--topic 可以正則表示式進行匹配,展示topic名稱 --topic

2.ConfigCommand

Config相關操作; 動態配置可以覆蓋預設的靜態配置;

2.1 查詢配置

Topic配置查詢

展示關於Topic的動靜態配置

1.查詢單個Topic配置(只列舉動態配置)

sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic
或者
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic

2.查詢所有Topic配置(包括內部Topic)(只列舉動態配置)

sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics

3.查詢Topic的詳細配置(動態+靜態)

只需要加上一個引數--all

其他配置/clients/users/brokers/broker-loggers 的查詢

同理 ;只需要將--entity-type 改成對應的型別就行了 (topics/clients/users/brokers/broker-loggers)

查詢kafka版本資訊

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

所有可配置的動態配置 請看最後面的 附件 部分

2.2 增刪改 配置 --alter

--alter

刪除配置: --delete-config k1=v1,k2=v2
新增/修改配置: --add-config k1,k2
選擇型別: --entity-type (topics/clients/users/brokers/broker-
loggers)
型別名稱: --entity-name

Topic新增/修改動態配置

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Topic刪除動態配置

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

其他配置同理,只需要型別改下--entity-type

型別有: (topics/clients/users/brokers/broker- loggers)

哪些配置可以修改 請看最後面的附件:ConfigCommand 的一些可選配置

3.副本擴縮、分割槽遷移、跨路徑遷移 kafka-reassign-partitions

請戳 【kafka運維】副本擴縮容、資料遷移、副本重分配、副本跨路徑遷移 (如果點不出來,表示文章暫未發表,請耐心等待)

4.Topic的傳送kafka-console-producer.sh

4.1 生產無key訊息

## 生產者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2 生產有key訊息
加上屬性--property parse.key=true

## 生產者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true

預設訊息key與訊息value間使用“Tab鍵”進行分隔,所以訊息key以及value中切勿使用轉義字元(\t)


可選引數

引數 值型別 說明 有效值
--bootstrap-server String 要連線的伺服器必需(除非指定--broker-list) 如:host1:prot1,host2:prot2
--topic String (必需)接收訊息的主題名稱
--batch-size Integer 單個批處理中傳送的訊息數 200(預設值)
--compression-codec String 壓縮編解碼器 none、gzip(預設值)snappy、lz4、zstd
--max-block-ms Long 在傳送請求期間,生產者將阻止的最長時間 60000(預設值)
--max-memory-bytes Long 生產者用來緩衝等待發送到伺服器的總記憶體 33554432(預設值)
--max-partition-memory-bytes Long 為分割槽分配的緩衝區大小 16384
--message-send-max-retries Integer 最大的重試傳送次數 3
--metadata-expiry-ms Long 強制更新元資料的時間閾值(ms) 300000
--producer-property String 將自定義屬性傳遞給生成器的機制 如:key=value
--producer.config String 生產者配置屬性檔案[--producer-property]優先於此配置 配置檔案完整路徑
--property String 自定義訊息讀取器 parse.key=true/false key.separator=<key.separator>ignore.error=true/false
--request-required-acks String 生產者請求的確認方式 0、1(預設值)、all
--request-timeout-ms Integer 生產者請求的確認超時時間 1500(預設值)
--retry-backoff-ms Integer 生產者重試前,重新整理元資料的等待時間閾值 100(預設值)
--socket-buffer-size Integer TCP接收緩衝大小 102400(預設值)
--timeout Integer 訊息排隊非同步等待處理的時間閾值 1000(預設值)
--sync 同步傳送訊息
--version 顯示 Kafka 版本 不配合其他引數時,顯示為本地Kafka版本
--help 列印幫助資訊

5. Topic的消費kafka-console-consumer.sh

1. 新客戶端從頭消費--from-beginning (注意這裡是新客戶端,如果之前已經消費過了是不會從頭消費的)
下面沒有指定客戶端名稱,所以每次執行都是新客戶端都會從頭消費

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. 正則表示式匹配topic進行消費--whitelist
消費所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*'

消費所有的topic,並且還從頭消費

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*' --from-beginning

3.顯示key進行消費--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

4. 指定分割槽消費--partition 指定起始偏移量消費--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 給客戶端命名--group

注意給客戶端命名之後,如果之前有過消費,那麼--from-beginning 就不會再從頭消費了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

6. 新增客戶端屬性--consumer-property

這個引數也可以給客戶端新增屬性,但是注意 不能多個地方配置同一個屬性,他們是互斥的;比如在下面的基礎上還加上屬性--group test-group 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group

7. 新增客戶端屬性--consumer.config

--consumer-property 一樣的性質,都是新增客戶端的屬性,不過這裡是指定一個檔案,把屬性寫在檔案裡面, --consumer-property 的優先順序大於 --consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties


引數 描述 例子
--group 指定消費者所屬組的ID
--topic 被消費的topic
--partition 指定分割槽 ;除非指定–offset,否則從分割槽結束(latest)開始消費 --partition 0
--offset 執行消費的起始offset位置 ;預設值: latest; /latest /earliest /偏移量 --offset 10
--whitelist 正則表示式匹配topic;--topic就不用指定了; 匹配到的所有topic都會消費; 當然用了這個引數,--partition --offset等就不能使用了
--consumer-property 將使用者定義的屬性以key=value的形式傳遞給使用者 --consumer-property group.id=test-consumer-group
--consumer.config 消費者配置屬性檔案請注意,[consumer-property]優先於此配置 --consumer.config config/consumer.properties
--property 初始化訊息格式化程式的屬性 print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>
--from-beginning 從存在的最早訊息開始,而不是從最新訊息開始,注意如果配置了客戶端名稱並且之前消費過,那就不會從頭消費了
--max-messages 消費的最大資料量,若不指定,則持續消費下去 --max-messages 100
--skip-message-on-error 如果處理訊息時出錯,請跳過它而不是暫停
--isolation-level 設定為read_committed以過濾掉未提交的事務性訊息,設定為read_uncommitted以讀取所有訊息,預設值:read_uncommitted
--formatter kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter

6.kafka-leader-election Leader重新選舉

6.1 指定Topic指定分割槽用重新PREFERRED:優先副本策略 進行Leader重選舉


> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2 所有Topic所有分割槽用重新PREFERRED:優先副本策略 進行Leader重選舉

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred  --all-topic-partitions

6.3 設定配置檔案批量指定topic和分割槽進行Leader重選舉

先配置leader-election.json檔案


{
  "partitions": [
    {
      "topic": "test_create_topic4",
      "partition": 1
    },
    {
      "topic": "test_create_topic4",
      "partition": 2
    }
  ]
}

 sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json
 

相關可選引數

引數 描述 例子
--bootstrap-server 指定kafka服務 指定連線到的kafka服務 --bootstrap-server localhost:9092
--topic 指定Topic,此引數跟--all-topic-partitionspath-to-json-file 三者互斥
--partition 指定分割槽,跟--topic搭配使用
--election-type 兩個選舉策略(PREFERRED: 優先副本選舉,如果第一個副本不線上的話會失敗;UNCLEAN: 策略)
--all-topic-partitions 所有topic所有分割槽執行Leader重選舉; 此引數跟--topicpath-to-json-file 三者互斥
--path-to-json-file 配置檔案批量選舉,此引數跟--topicall-topic-partitions 三者互斥

7. 持續批量推送訊息kafka-verifiable-producer.sh

單次傳送100條訊息--max-messages 100

一共要推送多少條,預設為-1,-1表示一直推送到程序關閉位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100

每秒傳送最大吞吐量不超過訊息 --throughput 100

推送訊息時的吞吐量,單位messages/sec。預設為-1,表示沒有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100

傳送的訊息體帶字首--value-prefix

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666

注意 --value-prefix 666必須是整數,傳送的訊息體的格式是加上一個 點號. 例如: 666.

其他引數:
--producer.config CONFIG_FILE 指定producer的配置檔案
--acks ACKS 每次推送訊息的ack值,預設是-1

8. 持續批量拉取訊息kafka-verifiable-consumer

持續消費

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

單次最大消費10條訊息--max-messages 10

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10


相關可選引數

引數 描述 例子
--bootstrap-server 指定kafka服務 指定連線到的kafka服務; --bootstrap-server localhost:9092
--topic 指定消費的topic
--group-id 消費者id;不指定的話每次都是新的組id
group-instance-id 消費組例項ID,唯一值
--max-messages 單次最大消費的訊息數量
--enable-autocommit 是否開啟offset自動提交;預設為false
--reset-policy 當以前沒有消費記錄時,選擇要拉取offset的策略,可以是earliest, latest,none。預設是earliest
--assignment-strategy consumer分配分割槽策略,預設是org.apache.kafka.clients.consumer.RangeAssignor
--consumer.config 指定consumer的配置檔案

9.生產者壓力測試kafka-producer-perf-test.sh

1. 傳送1024條訊息--num-records 100並且每條訊息大小為1KB--record-size 1024 最大吞吐量每秒10000條--throughput 100

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

你可以通過LogIKM檢視分割槽是否增加了對應的資料大小

LogIKM 可以看到傳送了1024條訊息; 並且總資料量=1M; 1024條*1024byte = 1M;

2. 用指定訊息檔案--payload-file 傳送100條訊息最大吞吐量每秒100條--throughput 100

  1. 先配置好訊息檔案batchmessage.txt

  2. 然後執行命令
    傳送的訊息會從batchmessage.txt裡面隨機選擇; 注意這裡我們沒有用引數--payload-delimeter指定分隔符,預設分隔符是\n換行;

    bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt

  3. 驗證訊息,可以通過 LogIKM 檢視傳送的訊息


相關可選引數

引數 描述 例子
--topic 指定消費的topic
--num-records 傳送多少條訊息
--throughput 每秒訊息最大吞吐量
--producer-props 生產者配置, k1=v1,k2=v2 --producer-props bootstrap.servers= localhost:9092,client.id=test_client
--producer.config 生產者配置檔案 --producer.config config/producer.propeties
--print-metrics 在test結束的時候列印監控資訊,預設false --print-metrics true
--transactional-id 指定事務 ID,測試併發事務的效能時需要,只有在 --transaction-duration-ms > 0 時生效,預設值為 performance-producer-default-transactional-id
--transaction-duration-ms 指定事務持續的最長時間,超過這段時間後就會呼叫 commitTransaction 來提交事務,只有指定了 > 0 的值才會開啟事務,預設值為 0
--record-size 一條訊息的大小byte; 和 --payload-file 兩個中必須指定一個,但不能同時指定
--payload-file 指定訊息的來原始檔,只支援 UTF-8 編碼的文字檔案,檔案的訊息分隔符通過 --payload-delimeter 指定,預設是用換行\nl來分割的,和 --record-size 兩個中必須指定一個,但不能同時指定 ; 如果提供的訊息
--payload-delimeter 如果通過 --payload-file 指定了從檔案中獲取訊息內容,那麼這個引數的意義是指定檔案的訊息分隔符,預設值為 \n,即檔案的每一行視為一條訊息;如果未指定--payload-file則此引數不生效;傳送訊息的時候是隨機送檔案裡面選擇訊息傳送的;

10.消費者壓力測試kafka-consumer-perf-test.sh

消費100條訊息 --messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100


相關可選引數

引數 描述 例子
--bootstrap-server
--consumer.config 消費者配置檔案
--date-format 結果打印出來的時間格式化 預設:yyyy-MM-dd HH:mm:ss:SSS
--fetch-size 單次請求獲取資料的大小 預設1048576
--topic 指定消費的topic
--from-latest
--group 消費組ID
--hide-header 如果設定了,則不列印header資訊
--messages 需要消費的數量
--num-fetch-threads feth 資料的執行緒數 預設:1
--print-metrics 結束的時候列印監控資料
--show-detailed-stats
--threads 消費執行緒數; 預設 10

11.刪除指定分割槽的訊息kafka-delete-records.sh

刪除指定topic的某個分割槽的訊息刪除至offset為1024

先配置json檔案offset-json-file.json

{"partitions":
[{"topic": "test1", "partition": 0,
  "offset": 1024}],
  "version":1
}

在執行命令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

驗證 通過 LogIKM 檢視傳送的訊息


從這裡可以看出來,配置"offset": 1024 的意思是從最開始的地方刪除訊息到 1024的offset; 是從最前面開始刪除的

12. 檢視Broker磁碟資訊

查詢指定topic磁碟資訊 --topic-list topic1,topic2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2

查詢指定Broker磁碟資訊--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0

例如我一個3分割槽3副本的Topic的查出來的資訊
logDir Broker中配置的log.dir

{
	"version": 1,
	"brokers": [{
		"broker": 0,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
			"error": null,
			"partitions": [{
				"partition": "test2-1",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-0",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-2",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}]
		}]
	}, {
		"broker": 1,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
			"error": null,
			"partitions": [{
				"partition": "test2-1",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-0",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-2",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}]
		}]
	}, {
		"broker": 2,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
			"error": null,
			"partitions": [{
				"partition": "test2-1",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-0",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-2",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}]
		}]
	}, {
		"broker": 3,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
			"error": null,
			"partitions": []
		}]
	}]
}

如果你覺得通過命令查詢磁碟資訊比較麻煩,你也可以通過 LogIKM 檢視

12. 消費者組管理 kafka-consumer-groups.sh

1. 檢視消費者列表--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

先呼叫MetadataRequest拿到所有線上Broker列表
再給每個Broker傳送ListGroupsRequest請求獲取 消費者組資料

2. 檢視消費者組詳情--describe

DescribeGroupsRequest

檢視消費組詳情--group--all-groups

檢視指定消費組詳情--group
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group


檢視所有消費組詳情--all-groups
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
檢視該消費組 消費的所有Topic、及所在分割槽、最新消費offset、Log最新資料offset、Lag還未消費數量、消費者ID等等資訊

查詢消費者成員資訊--members

所有消費組成員資訊
sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定消費組成員資訊
sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

查詢消費者狀態資訊--state

所有消費組狀態資訊
sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
指定消費組狀態資訊
sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

3. 刪除消費者組--delete

DeleteGroupsRequest

刪除消費組--delete

刪除指定消費組--group
sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
刪除所有消費組--all-groups
sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要刪除消費組前提是這個消費組的所有客戶端都停止消費/不線上才能夠成功刪除;否則會報下面異常

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. 重置消費組的偏移量 --reset-offsets

能夠執行成功的一個前提是 消費組這會是不可用狀態;

下面的示例使用的引數是: --dry-run ;這個引數表示預執行,會打印出來將要處理的結果;
等你想真正執行的時候請換成引數--excute ;

下面示例 重置模式都是 --to-earliest 重置到最早的;

請根據需要參考下面 相關重置Offset的模式 換成其他模式;

重置指定消費組的偏移量 --group

重置指定消費組的所有Topic的偏移量--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置指定消費組的指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消費組的偏移量 --all-group

重置所有消費組的所有Topic的偏移量--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置所有消費組中指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 後面需要接重置的模式

相關重置Offset的模式

引數 描述 例子
--to-earliest : 重置offset到最開始的那條offset(找到還未被刪除最早的那個offset)
--to-current: 直接重置offset到當前的offset,也就是LOE
--to-latest 重置到最後一個offset
--to-datetime: 重置到指定時間的offset;格式為:YYYY-MM-DDTHH:mm:SS.sss; --to-datetime "2021-6-26T00:00:00.000"
--to-offset 重置到指定的offset,但是通常情況下,匹配到多個分割槽,這裡是將匹配到的所有分割槽都重置到這一個值; 如果 1.目標最大offset<--to-offset, 這個時候重置為目標最大offset;2.目標最小offset>--to-offset ,則重置為最小; 3.否則的話才會重置為--to-offset的目標值; 一般不用這個 --to-offset 3465
--shift-by 按照偏移量增加或者減少多少個offset;正的為往前增加;負的往後退;當然這裡也是匹配所有的; --shift-by 100--shift-by -100
--from-file 根據CVS文件來重置; 這裡下面單獨講解

--from-file著重講解一下

上面其他的一些模式重置的都是匹配到的所有分割槽; 不能夠每個分割槽重置到不同的offset;不過--from-file可以讓我們更靈活一點;

  1. 先配置cvs文件
    格式為: Topic:分割槽號: 重置目標偏移量
    test2,0,100
    test2,1,200
    test2,2,300
    
  2. 執行命令

    sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 刪除偏移量delete-offsets

能夠執行成功的一個前提是 消費組這會是不可用狀態;

偏移量被刪除了之後,Consumer Group下次啟動的時候,會從頭消費;

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2


相關可選引數

引數 描述 例子
--bootstrap-server 指定連線到的kafka服務; --bootstrap-server localhost:9092
--list 列出所有消費組名稱 --list
--describe 查詢消費者描述資訊 --describe
--group 指定消費組
--all-groups 指定所有消費組
--members 查詢消費組的成員資訊
--state 查詢消費者的狀態資訊
--offsets 在查詢消費組描述資訊的時候,這個引數會列出訊息的偏移量資訊; 預設就會有這個引數的;
dry-run 重置偏移量的時候,使用這個引數可以讓你預先看到重置情況,這個時候還沒有真正的執行,真正執行換成--excute;預設為dry-run
--excute 真正的執行重置偏移量的操作;
--to-earliest 將offset重置到最早
to-latest 將offset重置到最近

附件

ConfigCommand 的一些可選配置


Topic相關可選配置

key value 示例
cleanup.policy 清理策略
compression.type 壓縮型別(通常建議在produce端控制)
delete.retention.ms 壓縮日誌的保留時間
file.delete.delay.ms
flush.messages 持久化message限制
flush.ms 持久化頻率
follower.replication.throttled.replicas flowwer副本限流 格式:分割槽號:副本follower號,分割槽號:副本follower號 0:1,1:1
index.interval.bytes
leader.replication.throttled.replicas leader副本限流 格式:分割槽號:副本Leader號 0:0
max.compaction.lag.ms
max.message.bytes 最大的batch的message大小
message.downconversion.enable message是否向下相容
message.format.version message格式版本
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas 最小的ISR
preallocate
retention.bytes 日誌保留大小(通常按照時間限制)
retention.ms 日誌保留時間
segment.bytes segment的大小限制
segment.index.bytes
segment.jitter.ms
segment.ms segment的切割時間
unclean.leader.election.enable 是否允許非同步副本選主

Broker相關可選配置

key value 示例
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable

Users相關可選配置

key value 示例
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate 針對消費者user進行限流
producer_byte_rate 針對生產者進行限流
request_percentage 請求百分比

clients相關可選配置

key value 示例
consumer_byte_rate
producer_byte_rate
request_percentage

以上大部分運維操作,都可以使用 LogI-Kafka-Manager 在平臺上視覺化操作;


歡迎Star共建滴滴開源的kafka的管理平臺,非常優秀非常好用的一款kafka管理平臺
滿足所有開發運維日常需求

滴滴開源Logi-KafkaManager 一站式Kafka監控與管控平臺

歡迎加個人微信拉你進開發技術交流群,群內專人解答技術疑問
(請備註:技術) wx: jjdlmn_ wx: mike_zhangliang