1. 程式人生 > 其它 >Kafka常用運維操作命令

Kafka常用運維操作命令

1. 叢集管理

(1)啟動 broker
一般需要在後臺啟動,因此加上"-daemon"引數。

bin/kafka-server-start.sh -daemon config/server.properties

(2)關閉 broker

bin/kafka-server-stop.sh

2. topic管理

kafka-topic.sh指令碼

對kafka-topic.sh相關的操作通常是指定 --zookeeper 引數。然而從 Kafka 2.2 版本開始,社群推薦用 --bootstrap-server 引數替換 --zookeeper 引數,並且顯式地將後者標記為“已過期”。(2.2以上也相容 --zookeeper ,但如果是2.2以前的版本,就只能使用 --zookeeper 引數。)
社群推薦使用 --bootstrap-server 而非 --zookeeper 的原因主要有兩個。

  1. 使用 --zookeeper 會繞過 Kafka 的安全體系。這就是說,即使你為 Kafka 叢集設定了安全認證,限制了主題的建立,如果你使用 --zookeeper 的命令,依然能成功建立任意主題,不受認證體系的約束。這顯然是 Kafka 叢集的運維人員不希望看到的。
  2. 使用 --bootstrap-server 與叢集進行互動,越來越成為使用 Kafka 的標準姿勢。換句話說,以後會有越來越少的命令和 API 需要與 ZooKeeper 進行連線。這樣,我們只需要一套連線資訊,就能與 Kafka 進行全方位的互動,不用像以前一樣,必須同時維護 ZooKeeper 和 Broker 的連線資訊。

2.1 列出叢集上所有topic

  • kafka2.2以前的版本

bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --list

--zookeeper引數後接zk的地址和埠,如果在部署kafka叢集時指定了zk的路徑,還需要加上特定的路徑。例如,kafka叢集配置的zk地址為:zookeeper.connect=broker-test1:2181,broker-test2:2181,broker-test3:2181/kafka 那麼列出叢集topic的命令如下:

./kafka-topics.sh --zookeeper broker-test1:2181/kafka --list
  • kafka2.2開始的版本

bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --list

2.2 建立topic

  • kafka2.2以前的版本

bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --create --topic <topic_name> --partitions <partition_num> --replication-factor <replication_factor_num>

例如,建立一個名為test的topic,指定分割槽數為3,副本數為2:

bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --create --topic test --partitions 3 --replication-factor 2 
  • kafka2.2開始的版本

bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --create --topic <topic_name> --partitions <partition_num> --replication-factor <replication_factor_num>

2.3 檢視topic詳細資訊

  • kafka2.2以前的版本

bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --describe --topic <topic_name>

例如,檢視名為xhs的topic資訊:

bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --describe --topic xhs
  • kafka2.2開始的版本

bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --describe --topic <topic_name>

如果 describe 命令不指定具體的主題名稱,那麼 Kafka 預設會返回所有“可見”主題的詳細資料給你。

這裡的“可見”,是指發起這個命令的使用者能夠看到的 Kafka 主題。2.2版本開始建立主題時,使用 --zookeeper 和 --bootstrap-server 的區別是一樣的。如果指定了 --bootstrap-server,那麼這條命令就會受到安全認證體系的約束,即對命令發起者進行許可權驗證,然後返回它能看到的主題。否則,如果指定 --zookeeper 引數,那麼預設會返回叢集中所有的主題詳細資料。基於這些原因,建議在2.2版本開始最好統一使用 --bootstrap-server 連線引數。

2.4 修改(增加)topic分割槽數量

其實就是增加分割槽,目前 Kafka 不允許減少某個主題的分割槽數。可以使用 kafka-topics 指令碼,結合 --alter 引數來增加某個主題的分割槽數。

  • kafka2.2以前的版本

bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --alter --topic <topic_name> --partitions <新分割槽數>

例如,修改名為test-zl的topic分割槽數為3:

bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --alter --topic test-zl --partitions 3
  • kafka2.2開始的版本

bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --alter --topic <topic_name> --partitions <新分割槽數>

⚠️這裡要注意的是,你指定的分割槽數一定要比原有分割槽數大,否則 Kafka 會丟擲 InvalidPartitionsException 異常。

2.5 刪除topic

  • kafka2.2以前的版本

bin/kafka-topics.sh --zookeeper {zk_host}:{zk_port}[/{path}] --delete --topic <topic_name>

例如,刪除名為test-zl的topic:

bin/kafka-topics.sh --zookeeper broker-test1:2181/kafka --delete --topic test-zl
  • kafka2.2開始的版本

bin/kafka-topics.sh --bootstrap-server {broker_host}:{kafka_port} --delete --topic <topic_name>

Kafka刪除操作是非同步的,執行完這條命令不代表主題立即就被刪除了。它僅僅是被標記成“已刪除”狀態而已。Kafka 會在後臺默默地開啟主題刪除操作。因此,通常都需要等待一段時間才會看到效果。

kafka-configs.sh指令碼

在主題建立之後,我們可以使用 kafka-configs 指令碼修改對應的引數。(修改topic級別引數

設定常規的主題級別引數,還是使用 --zookeeper。

2.6 修改主題級別引數

假設我們要設定主題級別引數 max.message.bytes,那麼命令如下:

bin/kafka-configs.sh --zookeeper {zk_host}:{zk_port}[/{path}] --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

3. 訊息管理

kafka-console-producer.sh指令碼

3.1 生產訊息

bin/kafka-console-producer.sh --broker-list {broker_host}:{kafka_port} --topic <topic_name>

例如,在下面這段命令中,我們指定生產者引數 acks 為 -1,同時啟用 LZ4 的壓縮演算法:

bin/kafka-console-producer.sh --broker-list broker-test1:9092 --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4

kafka-console-consumer.sh指令碼

3.2 從topic消費資料

  • 從頭開始消費

bin/kafka-console-consumer.sh --bootstrap-server {broker_host}:{kafka_port} --topic <topic_name> --from-beginning

--from-beginning引數表示從該主題最早的位移開始消費,例如消費名為testzl的topic資料:

bin/kafka-console-consumer.sh --bootstrap-server broker-test1:9092 --topic testzl --from-beginning
  • 指定消費組

bin/kafka-console-consumer.sh --bootstrap-server {broker_host}:{kafka_port} --topic <topic_name> --group <group_name>

--group引數可以指定消費者組,例如消費組group1從名為test-topic的topic消費資料:

bin/kafka-console-consumer.sh --bootstrap-server broker-test1:9092 --topic test-topic --group group1

⚠️注意:在這段命令中,我們指定了 group 資訊。如果沒有指定的話,每次執行 Console Consumer,它都會自動生成一個新的消費者組來消費。久而久之,你會發現你的叢集中有大量的以 console-consumer 開頭的消費者組。通常情況下,最好還是加上 group。

3.3 檢視指定消費者組的消費情況

bin/kafka-consumer-groups.sh --bootstrap-server broker-test1:9092 --describe --group group1

3.4 檢視所有消費者組提交的位移資料

對於 __consumer_offsets 而言,由於它儲存了消費者組的位移資料,有時候直接檢視該主題訊息是很方便的事情。下面的命令可以幫助我們直接檢視消費者組提交的位移資料。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

除了檢視位移提交資料,我們還可以直接讀取該主題訊息,檢視消費者組的狀態資訊。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

對於內部主題 __transaction_state 而言,方法是相同的。你只需要指定 kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter 即可。

kafka-consumer-groups.sh指令碼

3.5 查詢消費組位移

bin/kafka-consumer-groups.sh --bootstrap-server {broker_host}:{kafka_port} --describe --group <group_name>

3.6 重置消費組位移

重置位移可以大致從兩個維度來進行。

1.位移維度。
2.時間維度。

DateTime 策略

重點記錄一下比較常用的DateTime策略。DateTime 策略直接指定 --to-datetime

例如,把kafka topic TestRawLog 的 group.id: "group1" offset 重置到20210619 23:50

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group1 --topic TestRawLog --reset-offsets --to-datetime 2021-06-19T23:50:00.000+0800 --execute

最後一個引數--excute如果不加,只是列印位移調整方案,不實際執行;加上引數--excute執行真正的位移調整。

⚠️注意:需要考慮到時差問題。預設設定時間是UTC時間,並非北京時間,北京時區是東八區,領先UTC八個小時。因此需要加上8小時時差。即:+0800

4. topic分割槽管理

4.1 preferred leader 選舉

$ bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 --path-to-json-file <path>/preferred-leader-plan.json

# {"partitions":[{"topic":"test-topic","partition":0}]}

4.2 分割槽重分配

# 生成分配策略
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
# 執行分配策略
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file cluster-reassignment.json --execute
# 驗證分配
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file cluster-reassignment.json --verify
# 可通過編寫分配策略,增加副本因子 略

5. 其他指令碼工具

5.1 生產者效能測試

$ bin/kafka-producer-perf-test.sh --topic test_topic --num-records 50000000 --throughput -1 --record-size 200 --producer-props bootstrap.servers=localhost:9092 acks=1 linger.ms=50

5.2 消費者效能測試

$ bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --messages 500000000 --topic test_topic

5.3 檢視topic消費進度

必須引數為--group, 不指定--topic,預設為所有topic。

$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1

5.4 檢視訊息元資料

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /dfs5/kafka/data/secLog-2/00000000000110325000.log --print-data-log --deep-iteration > secLog.log

5.5 獲取 topic 當前訊息數

# 獲取當前最大位移
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9200 --topic test --time -1
# 當前獲取最早位移
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9200 --topic test --time -2
# 以上兩個數相減,即可得出 topic 當前在叢集的訊息總數