【kafka運維】Kafka全網最全最詳細運維命令合集(精品強烈建議收藏!!!)
本文所有命令,博主均全部操作驗證過,保證準確性; 非複製貼上拼湊文章; 如果想了解更多工具命令,可在評論區留下評論,博主會擇期加上;
博主正在連載 Kafka原始碼、Kafka運維、Kafka實踐系列文章 並且相關文章會配套錄製視訊
本文為專欄第一篇歡迎關注<石臻臻的雜貨鋪>不迷路!!!
以下大部分運維操作,都可以使用 LogI-Kafka-Manager 在平臺上視覺化操作;
@
目錄- 1.TopicCommand
- 2.ConfigCommand
- 3.副本擴縮、分割槽遷移、跨路徑遷移 kafka-reassign-partitions
- 4.Topic的傳送kafka-console-producer.sh
- 5. Topic的消費kafka-console-consumer.sh
- 6.kafka-leader-election Leader重新選舉
- 7. 持續批量推送訊息kafka-verifiable-producer.sh
- 8. 持續批量拉取訊息kafka-verifiable-consumer
- 9.生產者壓力測試kafka-producer-perf-test.sh
- 10.消費者壓力測試kafka-consumer-perf-test.sh
- 11.刪除指定分割槽的訊息kafka-delete-records.sh
- 12. 檢視Broker磁碟資訊
- 12. 消費者組管理 kafka-consumer-groups.sh
- 附件
1.TopicCommand
1.1.Topic建立
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--bootstrap-server 指定kafka服務 |
指定連線到的kafka服務; 如果有這個引數,則 --zookeeper 可以不需要 |
--bootstrap-server localhost:9092 |
--zookeeper |
棄用, 通過zk的連線方式連線到kafka叢集; | --zookeeper localhost:2181 或者localhost:2181/kafka |
--replication-factor |
副本數量,注意不能大於broker數量;如果不提供,則會用叢集中預設配置 | --replication-factor 3 |
--partitions |
分割槽數量,當建立或者修改topic的時候,用這個來指定分割槽數;如果建立的時候沒有提供引數,則用叢集中預設值; 注意如果是修改的時候,分割槽比之前小會有問題 | --partitions 3 |
--replica-assignment |
副本分割槽分配方式;建立topic的時候可以自己指定副本分配情況; | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分割槽和三個副本,對應分配的Broker; 逗號隔開標識分割槽;冒號隔開表示副本 |
--config <String: name=value> |
用來設定topic級別的配置以覆蓋預設配置;只在--create 和--bootstrap-server 同時使用時候生效; 可以配置的引數列表請看文末附件 | 例如覆蓋兩個配置 --config retention.bytes=123455 --config retention.ms=600001 |
--command-config <String: command 檔案路徑> |
用來配置客戶端Admin Client啟動配置,只在--bootstrap-server 同時使用時候生效; | 例如:設定請求的超時時間 --command-config config/producer.proterties ; 然後在檔案中配置 request.timeout.ms=300000 |
1.2.刪除Topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
支援正則表示式匹配Topic來進行刪除,只需要將topic 用雙引號包裹起來
例如: 刪除以create_topic_byhand_zk
為開頭的topic;
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*"
.
表示任意匹配除換行符 \n 之外的任何單字元。要匹配 . ,請使用 . 。
·*·
:匹配前面的子表示式零次或多次。要匹配 * 字元,請使用 *。
.*
: 任意字元
刪除任意Topic (慎用)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?"
更多的用法請參考正則表示式
1.3.Topic分割槽擴容
zk方式(不推薦)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka版本 >= 2.2 支援下面方式(推薦)
單個Topic擴容
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4
批量擴容 (將所有正則表示式匹配到的Topic分割槽擴容到4個)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4
".*?"
正則表示式的意思是匹配所有; 您可按需匹配
PS: 當某個Topic的分割槽少於指定的分割槽數時候,他會丟擲異常;但是不會影響其他Topic正常進行;
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--replica-assignment |
副本分割槽分配方式;建立topic的時候可以自己指定副本分配情況; | --replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分割槽和三個副本,對應分配的Broker; 逗號隔開標識分割槽;冒號隔開表示副本 |
PS: 雖然這裡配置的是全部的分割槽副本分配配置,但是正在生效的是新增的分割槽;
比如: 以前3分割槽1副本是這樣的
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 |
現在新增一個分割槽,--replica-assignment
2,1,3,4 ; 看這個意思好像是把0,1號分割槽互相換個Broker
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
1 | 0 | 2 | 3 |
但是實際上不會這樣做,Controller在處理的時候會把前面3個截掉; 只取新增的分割槽分配方式,原來的還是不會變
Broker-1 | Broker-2 | Broker-3 | Broker-4 |
---|---|---|---|
0 | 1 | 2 | 3 |
1.4.查詢Topic描述
1.查詢單個Topic
sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal
2.批量查詢Topic(正則表示式匹配,下面是查詢所有Topic)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal
支援正則表示式匹配Topic,只需要將topic 用雙引號包裹起來
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--bootstrap-server 指定kafka服務 |
指定連線到的kafka服務; 如果有這個引數,則 --zookeeper 可以不需要 |
--bootstrap-server localhost:9092 |
--at-min-isr-partitions |
查詢的時候省略一些計數和配置資訊 | --at-min-isr-partitions |
--exclude-internal |
排除kafka內部topic,比如__consumer_offsets-* |
--exclude-internal |
--topics-with-overrides |
僅顯示已覆蓋配置的主題,也就是單獨針對Topic設定的配置覆蓋預設配置;不展示分割槽資訊 | --topics-with-overrides |
5.查詢Topic列表
1.查詢所有Topic列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal
2.查詢匹配Topic列表(正則表示式)
查詢
test_create_
開頭的所有Topic列表
sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--exclude-internal |
排除kafka內部topic,比如__consumer_offsets-* |
--exclude-internal |
--topic |
可以正則表示式進行匹配,展示topic名稱 | --topic |
2.ConfigCommand
Config相關操作; 動態配置可以覆蓋預設的靜態配置;
2.1 查詢配置
Topic配置查詢
展示關於Topic的動靜態配置
1.查詢單個Topic配置(只列舉動態配置)
sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic
或者
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic
2.查詢所有Topic配置(包括內部Topic)(只列舉動態配置)
sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics
3.查詢Topic的詳細配置(動態+靜態)
只需要加上一個引數
--all
其他配置/clients/users/brokers/broker-loggers 的查詢
同理 ;只需要將
--entity-type
改成對應的型別就行了 (topics/clients/users/brokers/broker-loggers)
查詢kafka版本資訊
sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version
所有可配置的動態配置 請看最後面的 附件 部分
2.2 增刪改 配置 --alter
--alter
刪除配置: --delete-config
k1=v1,k2=v2
新增/修改配置: --add-config
k1,k2
選擇型別: --entity-type
(topics/clients/users/brokers/broker-
loggers)
型別名稱: --entity-name
Topic新增/修改動態配置
--add-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999
Topic刪除動態配置
--delete-config
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms
其他配置同理,只需要型別改下--entity-type
型別有: (topics/clients/users/brokers/broker- loggers)
哪些配置可以修改 請看最後面的附件:ConfigCommand 的一些可選配置
3.副本擴縮、分割槽遷移、跨路徑遷移 kafka-reassign-partitions
請戳 【kafka運維】副本擴縮容、資料遷移、副本重分配、副本跨路徑遷移 (如果點不出來,表示文章暫未發表,請耐心等待)
4.Topic的傳送kafka-console-producer.sh
4.1 生產無key訊息
## 生產者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties
4.2 生產有key訊息
加上屬性--property parse.key=true
## 生產者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true
預設訊息key與訊息value間使用“Tab鍵”進行分隔,所以訊息key以及value中切勿使用轉義字元(\t)
可選引數
引數 | 值型別 | 說明 | 有效值 |
---|---|---|---|
--bootstrap-server | String | 要連線的伺服器必需(除非指定--broker-list) | 如:host1:prot1,host2:prot2 |
--topic | String | (必需)接收訊息的主題名稱 | |
--batch-size | Integer | 單個批處理中傳送的訊息數 | 200(預設值) |
--compression-codec | String | 壓縮編解碼器 | none、gzip(預設值)snappy、lz4、zstd |
--max-block-ms | Long | 在傳送請求期間,生產者將阻止的最長時間 | 60000(預設值) |
--max-memory-bytes | Long | 生產者用來緩衝等待發送到伺服器的總記憶體 | 33554432(預設值) |
--max-partition-memory-bytes | Long | 為分割槽分配的緩衝區大小 | 16384 |
--message-send-max-retries | Integer | 最大的重試傳送次數 | 3 |
--metadata-expiry-ms | Long | 強制更新元資料的時間閾值(ms) | 300000 |
--producer-property | String | 將自定義屬性傳遞給生成器的機制 | 如:key=value |
--producer.config | String | 生產者配置屬性檔案[--producer-property]優先於此配置 配置檔案完整路徑 | |
--property | String | 自定義訊息讀取器 | parse.key=true/false key.separator=<key.separator>ignore.error=true/false |
--request-required-acks | String | 生產者請求的確認方式 | 0、1(預設值)、all |
--request-timeout-ms | Integer | 生產者請求的確認超時時間 | 1500(預設值) |
--retry-backoff-ms | Integer | 生產者重試前,重新整理元資料的等待時間閾值 | 100(預設值) |
--socket-buffer-size | Integer | TCP接收緩衝大小 | 102400(預設值) |
--timeout | Integer | 訊息排隊非同步等待處理的時間閾值 | 1000(預設值) |
--sync | 同步傳送訊息 | ||
--version | 顯示 Kafka 版本 | 不配合其他引數時,顯示為本地Kafka版本 | |
--help | 列印幫助資訊 |
5. Topic的消費kafka-console-consumer.sh
1. 新客戶端從頭消費--from-beginning
(注意這裡是新客戶端,如果之前已經消費過了是不會從頭消費的)
下面沒有指定客戶端名稱,所以每次執行都是新客戶端都會從頭消費
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2. 正則表示式匹配topic進行消費--whitelist
消費所有的topic
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*'
消費所有的topic,並且還從頭消費
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*' --from-beginning
3.顯示key進行消費--property print.key=true
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true
4. 指定分割槽消費--partition
指定起始偏移量消費--offset
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100
5. 給客戶端命名--group
注意給客戶端命名之後,如果之前有過消費,那麼--from-beginning
就不會再從頭消費了
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group
6. 新增客戶端屬性--consumer-property
這個引數也可以給客戶端新增屬性,但是注意 不能多個地方配置同一個屬性,他們是互斥的;比如在下面的基礎上還加上屬性--group test-group
那肯定不行
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
--consumer-property group.id=test-consumer-group
7. 新增客戶端屬性--consumer.config
跟--consumer-property
一樣的性質,都是新增客戶端的屬性,不過這裡是指定一個檔案,把屬性寫在檔案裡面, --consumer-property
的優先順序大於 --consumer.config
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
引數 | 描述 | 例子 |
---|---|---|
--group |
指定消費者所屬組的ID | |
--topic |
被消費的topic | |
--partition |
指定分割槽 ;除非指定–offset ,否則從分割槽結束(latest)開始消費 |
--partition 0 |
--offset |
執行消費的起始offset位置 ;預設值: latest; /latest /earliest /偏移量 | --offset 10 |
--whitelist |
正則表示式匹配topic;--topic 就不用指定了; 匹配到的所有topic都會消費; 當然用了這個引數,--partition --offset 等就不能使用了 |
|
--consumer-property |
將使用者定義的屬性以key=value的形式傳遞給使用者 | --consumer-property group.id=test-consumer-group |
--consumer.config |
消費者配置屬性檔案請注意,[consumer-property ]優先於此配置 |
--consumer.config config/consumer.properties |
--property |
初始化訊息格式化程式的屬性 | print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer> |
--from-beginning |
從存在的最早訊息開始,而不是從最新訊息開始,注意如果配置了客戶端名稱並且之前消費過,那就不會從頭消費了 | |
--max-messages |
消費的最大資料量,若不指定,則持續消費下去 | --max-messages 100 |
--skip-message-on-error |
如果處理訊息時出錯,請跳過它而不是暫停 | |
--isolation-level |
設定為read_committed以過濾掉未提交的事務性訊息,設定為read_uncommitted以讀取所有訊息,預設值:read_uncommitted | |
--formatter |
kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter |
6.kafka-leader-election Leader重新選舉
6.1 指定Topic指定分割槽用重新PREFERRED:優先副本策略
進行Leader重選舉
> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0
6.2 所有Topic所有分割槽用重新PREFERRED:優先副本策略
進行Leader重選舉
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions
6.3 設定配置檔案批量指定topic和分割槽進行Leader重選舉
先配置leader-election.json檔案
{
"partitions": [
{
"topic": "test_create_topic4",
"partition": 1
},
{
"topic": "test_create_topic4",
"partition": 2
}
]
}
sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--bootstrap-server 指定kafka服務 |
指定連線到的kafka服務 | --bootstrap-server localhost:9092 |
--topic |
指定Topic,此引數跟--all-topic-partitions 和path-to-json-file 三者互斥 |
|
--partition |
指定分割槽,跟--topic 搭配使用 |
|
--election-type |
兩個選舉策略(PREFERRED: 優先副本選舉,如果第一個副本不線上的話會失敗;UNCLEAN : 策略) |
|
--all-topic-partitions |
所有topic所有分割槽執行Leader重選舉; 此引數跟--topic 和path-to-json-file 三者互斥 |
|
--path-to-json-file |
配置檔案批量選舉,此引數跟--topic 和all-topic-partitions 三者互斥 |
7. 持續批量推送訊息kafka-verifiable-producer.sh
單次傳送100條訊息--max-messages 100
一共要推送多少條,預設為-1,-1表示一直推送到程序關閉位置
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092
--max-messages 100
每秒傳送最大吞吐量不超過訊息 --throughput 100
推送訊息時的吞吐量,單位messages/sec。預設為-1,表示沒有限制
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092
--throughput 100
傳送的訊息體帶字首--value-prefix
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092
--value-prefix 666
注意 --value-prefix 666
必須是整數,傳送的訊息體的格式是加上一個 點號.
例如: 666.
其他引數:
--producer.config CONFIG_FILE
指定producer的配置檔案
--acks ACKS
每次推送訊息的ack值,預設是-1
8. 持續批量拉取訊息kafka-verifiable-consumer
持續消費
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4
單次最大消費10條訊息--max-messages 10
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4
--max-messages 10
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--bootstrap-server 指定kafka服務 |
指定連線到的kafka服務; | --bootstrap-server localhost:9092 |
--topic |
指定消費的topic | |
--group-id |
消費者id;不指定的話每次都是新的組id | |
group-instance-id |
消費組例項ID,唯一值 | |
--max-messages |
單次最大消費的訊息數量 | |
--enable-autocommit |
是否開啟offset自動提交;預設為false | |
--reset-policy |
當以前沒有消費記錄時,選擇要拉取offset的策略,可以是earliest , latest ,none 。預設是earliest |
|
--assignment-strategy |
consumer分配分割槽策略,預設是org.apache.kafka.clients.consumer.RangeAssignor |
|
--consumer.config |
指定consumer的配置檔案 |
9.生產者壓力測試kafka-producer-perf-test.sh
1. 傳送1024條訊息--num-records 100
並且每條訊息大小為1KB--record-size 1024
最大吞吐量每秒10000條--throughput 100
sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024
你可以通過LogIKM檢視分割槽是否增加了對應的資料大小
從LogIKM 可以看到傳送了1024條訊息; 並且總資料量=1M; 1024條*1024byte = 1M;
2. 用指定訊息檔案--payload-file
傳送100條訊息最大吞吐量每秒100條--throughput 100
-
先配置好訊息檔案
batchmessage.txt
-
然後執行命令
傳送的訊息會從batchmessage.txt
裡面隨機選擇; 注意這裡我們沒有用引數--payload-delimeter
指定分隔符,預設分隔符是\n換行;bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt
-
驗證訊息,可以通過 LogIKM 檢視傳送的訊息
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--topic |
指定消費的topic | |
--num-records |
傳送多少條訊息 | |
--throughput |
每秒訊息最大吞吐量 | |
--producer-props |
生產者配置, k1=v1,k2=v2 | --producer-props bootstrap.servers= localhost:9092,client.id=test_client |
--producer.config |
生產者配置檔案 | --producer.config config/producer.propeties |
--print-metrics |
在test結束的時候列印監控資訊,預設false | --print-metrics true |
--transactional-id |
指定事務 ID,測試併發事務的效能時需要,只有在 --transaction-duration-ms > 0 時生效,預設值為 performance-producer-default-transactional-id | |
--transaction-duration-ms |
指定事務持續的最長時間,超過這段時間後就會呼叫 commitTransaction 來提交事務,只有指定了 > 0 的值才會開啟事務,預設值為 0 | |
--record-size |
一條訊息的大小byte; 和 --payload-file 兩個中必須指定一個,但不能同時指定 | |
--payload-file |
指定訊息的來原始檔,只支援 UTF-8 編碼的文字檔案,檔案的訊息分隔符通過 --payload-delimeter 指定,預設是用換行\nl來分割的,和 --record-size 兩個中必須指定一個,但不能同時指定 ; 如果提供的訊息 |
|
--payload-delimeter |
如果通過 --payload-file 指定了從檔案中獲取訊息內容,那麼這個引數的意義是指定檔案的訊息分隔符,預設值為 \n,即檔案的每一行視為一條訊息;如果未指定--payload-file 則此引數不生效;傳送訊息的時候是隨機送檔案裡面選擇訊息傳送的; |
10.消費者壓力測試kafka-consumer-perf-test.sh
消費100條訊息 --messages 100
sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--bootstrap-server |
||
--consumer.config |
消費者配置檔案 | |
--date-format |
結果打印出來的時間格式化 | 預設:yyyy-MM-dd HH:mm:ss:SSS |
--fetch-size |
單次請求獲取資料的大小 | 預設1048576 |
--topic |
指定消費的topic | |
--from-latest |
||
--group |
消費組ID | |
--hide-header |
如果設定了,則不列印header資訊 | |
--messages |
需要消費的數量 | |
--num-fetch-threads |
feth 資料的執行緒數 | 預設:1 |
--print-metrics |
結束的時候列印監控資料 | |
--show-detailed-stats |
||
--threads |
消費執行緒數; | 預設 10 |
11.刪除指定分割槽的訊息kafka-delete-records.sh
刪除指定topic的某個分割槽的訊息刪除至offset為1024
先配置json檔案offset-json-file.json
{"partitions":
[{"topic": "test1", "partition": 0,
"offset": 1024}],
"version":1
}
在執行命令
sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json
驗證 通過 LogIKM 檢視傳送的訊息
從這裡可以看出來,配置"offset": 1024
的意思是從最開始的地方刪除訊息到 1024的offset; 是從最前面開始刪除的
12. 檢視Broker磁碟資訊
查詢指定topic磁碟資訊 --topic-list topic1,topic2
sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2
查詢指定Broker磁碟資訊--broker-list 0 broker1,broker2
sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0
例如我一個3分割槽3副本的Topic的查出來的資訊
logDir
Broker中配置的log.dir
{
"version": 1,
"brokers": [{
"broker": 0,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 1,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 2,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
"error": null,
"partitions": [{
"partition": "test2-1",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "test2-2",
"size": 0,
"offsetLag": 0,
"isFuture": false
}]
}]
}, {
"broker": 3,
"logDirs": [{
"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
"error": null,
"partitions": []
}]
}]
}
如果你覺得通過命令查詢磁碟資訊比較麻煩,你也可以通過 LogIKM 檢視
12. 消費者組管理 kafka-consumer-groups.sh
1. 檢視消費者列表--list
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list
先呼叫MetadataRequest
拿到所有線上Broker列表
再給每個Broker傳送ListGroupsRequest
請求獲取 消費者組資料
2. 檢視消費者組詳情--describe
DescribeGroupsRequest
檢視消費組詳情--group
或 --all-groups
檢視指定消費組詳情
--group
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group
檢視所有消費組詳情
--all-groups
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups
檢視該消費組 消費的所有Topic、及所在分割槽、最新消費offset、Log最新資料offset、Lag還未消費數量、消費者ID等等資訊
查詢消費者成員資訊--members
所有消費組成員資訊
sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定消費組成員資訊
sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090
查詢消費者狀態資訊--state
所有消費組狀態資訊
sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
指定消費組狀態資訊
sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090
3. 刪除消費者組--delete
DeleteGroupsRequest
刪除消費組--delete
刪除指定消費組
--group
sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
刪除所有消費組--all-groups
sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090
PS: 想要刪除消費組前提是這個消費組的所有客戶端都停止消費/不線上才能夠成功刪除;否則會報下面異常
Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
4. 重置消費組的偏移量 --reset-offsets
能夠執行成功的一個前提是 消費組這會是不可用狀態;
下面的示例使用的引數是: --dry-run
;這個引數表示預執行,會打印出來將要處理的結果;
等你想真正執行的時候請換成引數--excute
;
下面示例 重置模式都是 --to-earliest
重置到最早的;
請根據需要參考下面 相關重置Offset的模式 換成其他模式;
重置指定消費組的偏移量 --group
重置指定消費組的所有Topic的偏移量
--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置指定消費組的指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2
重置所有消費組的偏移量 --all-group
重置所有消費組的所有Topic的偏移量
--all-topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置所有消費組中指定Topic的偏移量--topic
sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2
--reset-offsets
後面需要接重置的模式
相關重置Offset的模式
引數 | 描述 | 例子 |
---|---|---|
--to-earliest : |
重置offset到最開始的那條offset(找到還未被刪除最早的那個offset) | |
--to-current : |
直接重置offset到當前的offset,也就是LOE | |
--to-latest : |
重置到最後一個offset | |
--to-datetime : |
重置到指定時間的offset;格式為:YYYY-MM-DDTHH:mm:SS.sss ; |
--to-datetime "2021-6-26T00:00:00.000" |
--to-offset |
重置到指定的offset,但是通常情況下,匹配到多個分割槽,這裡是將匹配到的所有分割槽都重置到這一個值; 如果 1.目標最大offset<--to-offset , 這個時候重置為目標最大offset;2.目標最小offset>--to-offset ,則重置為最小; 3.否則的話才會重置為--to-offset 的目標值; 一般不用這個 |
--to-offset 3465 |
--shift-by |
按照偏移量增加或者減少多少個offset;正的為往前增加;負的往後退;當然這裡也是匹配所有的; | --shift-by 100 、--shift-by -100 |
--from-file |
根據CVS文件來重置; 這裡下面單獨講解 |
--from-file
著重講解一下
上面其他的一些模式重置的都是匹配到的所有分割槽; 不能夠每個分割槽重置到不同的offset;不過
--from-file
可以讓我們更靈活一點;
- 先配置cvs文件
格式為: Topic:分割槽號: 重置目標偏移量test2,0,100 test2,1,200 test2,2,300
- 執行命令
sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv
5. 刪除偏移量delete-offsets
能夠執行成功的一個前提是 消費組這會是不可用狀態;
偏移量被刪除了之後,Consumer Group下次啟動的時候,會從頭消費;
sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2
相關可選引數
引數 | 描述 | 例子 |
---|---|---|
--bootstrap-server |
指定連線到的kafka服務; | --bootstrap-server localhost:9092 |
--list |
列出所有消費組名稱 | --list |
--describe |
查詢消費者描述資訊 | --describe |
--group |
指定消費組 | |
--all-groups |
指定所有消費組 | |
--members |
查詢消費組的成員資訊 | |
--state |
查詢消費者的狀態資訊 | |
--offsets |
在查詢消費組描述資訊的時候,這個引數會列出訊息的偏移量資訊; 預設就會有這個引數的; | |
dry-run |
重置偏移量的時候,使用這個引數可以讓你預先看到重置情況,這個時候還沒有真正的執行,真正執行換成--excute ;預設為dry-run |
|
--excute |
真正的執行重置偏移量的操作; | |
--to-earliest |
將offset重置到最早 | |
to-latest |
將offset重置到最近 |
附件
ConfigCommand 的一些可選配置
Topic相關可選配置
key | value | 示例 |
---|---|---|
cleanup.policy | 清理策略 | |
compression.type | 壓縮型別(通常建議在produce端控制) | |
delete.retention.ms | 壓縮日誌的保留時間 | |
file.delete.delay.ms | ||
flush.messages | 持久化message限制 | |
flush.ms | 持久化頻率 | |
follower.replication.throttled.replicas | flowwer副本限流 格式:分割槽號:副本follower號,分割槽號:副本follower號 | 0:1,1:1 |
index.interval.bytes | ||
leader.replication.throttled.replicas | leader副本限流 格式:分割槽號:副本Leader號 | 0:0 |
max.compaction.lag.ms | ||
max.message.bytes | 最大的batch的message大小 | |
message.downconversion.enable | message是否向下相容 | |
message.format.version | message格式版本 | |
message.timestamp.difference.max.ms | ||
message.timestamp.type | ||
min.cleanable.dirty.ratio | ||
min.compaction.lag.ms | ||
min.insync.replicas | 最小的ISR | |
preallocate | ||
retention.bytes | 日誌保留大小(通常按照時間限制) | |
retention.ms | 日誌保留時間 | |
segment.bytes | segment的大小限制 | |
segment.index.bytes | ||
segment.jitter.ms | ||
segment.ms | segment的切割時間 | |
unclean.leader.election.enable | 是否允許非同步副本選主 |
Broker相關可選配置
key | value | 示例 |
---|---|---|
advertised.listeners | ||
background.threads | ||
compression.type | ||
follower.replication.throttled.rate | ||
leader.replication.throttled.rate | ||
listener.security.protocol.map | ||
listeners | ||
log.cleaner.backoff.ms | ||
log.cleaner.dedupe.buffer.size | ||
log.cleaner.delete.retention.ms | ||
log.cleaner.io.buffer.load.factor | ||
log.cleaner.io.buffer.size | ||
log.cleaner.io.max.bytes.per.second | ||
log.cleaner.max.compaction.lag.ms | ||
log.cleaner.min.cleanable.ratio | ||
log.cleaner.min.compaction.lag.ms | ||
log.cleaner.threads | ||
log.cleanup.policy | ||
log.flush.interval.messages | ||
log.flush.interval.ms | ||
log.index.interval.bytes | ||
log.index.size.max.bytes | ||
log.message.downconversion.enable | ||
log.message.timestamp.difference.max.ms | ||
log.message.timestamp.type | ||
log.preallocate | ||
log.retention.bytes | ||
log.retention.ms | ||
log.roll.jitter.ms | ||
log.roll.ms | ||
log.segment.bytes | ||
log.segment.delete.delay.ms | ||
max.connections | ||
max.connections.per.ip | ||
max.connections.per.ip.overrides | ||
message.max.bytes | ||
metric.reporters | ||
min.insync.replicas | ||
num.io.threads | ||
num.network.threads | ||
num.recovery.threads.per.data.dir | ||
num.replica.fetchers | ||
principal.builder.class | ||
replica.alter.log.dirs.io.max.bytes.per.second | ||
sasl.enabled.mechanisms | ||
sasl.jaas.config | ||
sasl.kerberos.kinit.cmd | ||
sasl.kerberos.min.time.before.relogin | ||
sasl.kerberos.principal.to.local.rules | ||
sasl.kerberos.service.name | ||
sasl.kerberos.ticket.renew.jitter | ||
sasl.kerberos.ticket.renew.window.factor | ||
sasl.login.refresh.buffer.seconds | ||
sasl.login.refresh.min.period.seconds | ||
sasl.login.refresh.window.factor | ||
sasl.login.refresh.window.jitter | ||
sasl.mechanism.inter.broker.protocol | ||
ssl.cipher.suites | ||
ssl.client.auth | ||
ssl.enabled.protocols | ||
ssl.endpoint.identification.algorithm | ||
ssl.key.password | ||
ssl.keymanager.algorithm | ||
ssl.keystore.location | ||
ssl.keystore.password | ||
ssl.keystore.type | ||
ssl.protocol | ||
ssl.provider | ||
ssl.secure.random.implementation | ||
ssl.trustmanager.algorithm | ||
ssl.truststore.location | ||
ssl.truststore.password | ||
ssl.truststore.type | ||
unclean.leader.election.enable |
Users相關可選配置
key | value | 示例 |
---|---|---|
SCRAM-SHA-256 | ||
SCRAM-SHA-512 | ||
consumer_byte_rate | 針對消費者user進行限流 | |
producer_byte_rate | 針對生產者進行限流 | |
request_percentage | 請求百分比 |
clients相關可選配置
key | value | 示例 |
---|---|---|
consumer_byte_rate | ||
producer_byte_rate | ||
request_percentage |
以上大部分運維操作,都可以使用 LogI-Kafka-Manager 在平臺上視覺化操作;
歡迎Star和共建由滴滴開源的kafka的管理平臺,非常優秀非常好用的一款kafka管理平臺
滿足所有開發運維日常需求
滴滴開源Logi-KafkaManager 一站式Kafka監控與管控平臺
歡迎加個人微信拉你進開發技術交流群,群內專人解答技術疑問
(請備註:技術) wx: jjdlmn_ 或 wx: mike_zhangliang