kafka 常用運維命令介紹(二)
文章目錄
- 一、producer 相關命令
- 1. kafka-console-producer 生產訊息
- 2. 使用 kafka-producer-perf-test 進行producer的基準測試
- 3. 使用 kafka-verifiable-producer 批量推送訊息
- 4. 使用kafka-replay-log-producer進行topic之間的訊息複製
- 二、consumer相關命令
- 1. kafka-console-consumer 消費訊息
- 2. 使用 kafka-consumer-perf-test 進行consumer的基準測試
- 3. 使用kafka-verifiable-consumer批量拉取訊息
- 4. 使用kafka-consumer-groups命令管理ConsumerGroup
- 三、replica資料一致性校驗
一、producer 相關命令
1. kafka-console-producer 生產訊息
使用kafka-console-producer我們可以快速往某個topic推送訊息。kafka-console-producer使用的也是KafkaProducer類進行訊息的推送,因此KafkaProducer支援的引數kafka-console-producer都可以配置。
有關KafkaProducer的相關原理可以看我的這篇部落格:
https://blog.csdn.net/u013332124/article/details/81321942
# 執行下面這條命令後會進入producer的互動介面,輸入字串就會將訊息推送到kafka叢集
kafka-console-producer --broker-list 127.0.0.1:9092 --topic test
# 推送10條訊息 分別是1、2、3、...、10
seq 10 | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb
# 推送hello world 到kafka叢集
echo "nihao world" | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb
2. 使用 kafka-producer-perf-test 進行producer的基準測試
我們要修改某個配置時,經常想知道這個配置的修改對kafka的效能會有哪些影響,這時候就可以來個基準測試來衡量配置修改對producer效能的影響。kafka官方就提供了這樣一個工具,讓我們很方面的對produer的效能進行測試。
下面是kafka-producer-perf-test支援的一些引數
--topic TOPIC 指定topic
--num-records NUM-RECORDS 要推送多少條資料
--payload-delimiter PAYLOAD-DELIMITER 當使用payload檔案生成資料時,指定每條訊息的之間的分割符,預設是換行符
--throughput THROUGHPUT 推送訊息時的吞吐量,單位是 messages/sec。必須指定
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] 指定producer的一些配置
--producer.config CONFIG-FILE 直接指定配置檔案
--print-metrics 是否要在最後輸出度量指標,預設是false
# 生成資料的方式有兩種,一種是我們指定一個訊息大小,該工具會隨機生成一個指定大小的字串推送,一個是我們指定一個檔案,工具會從該檔案中隨機選取一條訊息推送
# 下面兩種方式只能選擇一種
--record-size RECORD-SIZE 指定每條訊息的大小,大小是bytes
--payload-file PAYLOAD-FILE 指定檔案存放目錄
下面我們測試使用producer一次推送100條資料
# 通過 --producer-props指定要連線的broker地址
# --num-records 指定一共要推送100條
# --throughput 表示吞吐量,限制每秒20
# --record-size 表示每條訊息的大小是20B
kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --num-records 100 --throughput 10 --topic test --record-size 20
最後輸出報告:
52 records sent, 10.4 records/sec (0.00 MB/sec), 5.2 ms avg latency, 137.0 max latency.
100 records sent, 9.993005 records/sec (0.00 MB/sec), 3.78 ms avg latency, 137.00 ms max latency, 2 ms 50th, 4 ms 95th, 137 ms 99th, 137 ms 99.9th.
我們可以編輯一個payload.txt,輸入
hello
world
producer
perf
test
接著使用該payload.txt進行測試
# --payload-file 指定檔案地址
kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --payload-file payload.txt --num-records 100 --throughput 100 --topic test
該工具在執行時,會讀取payload.txt的內容,然後根據--payload-delimiter
將文字分成一條條訊息,接著測試的時候會隨機發送這些訊息。
3. 使用 kafka-verifiable-producer 批量推送訊息
kafka提供了kafka-verifiable-producer工具用於快速的推送一批訊息到producer,並且可以打印出各條推送訊息的元資訊。推送的訊息是從0開始不斷往上遞增。
支援引數
--topic TOPIC 指定topic
--broker-list HOST1:PORT1[,HOST2:PORT2[...]] 指定kafka broker地址
--max-messages MAX-MESSAGES 一共要推送多少條,預設為-1,-1表示一直推送到程序關閉位置
--throughput THROUGHPUT 推送訊息時的吞吐量,單位messages/sec。預設為-1,表示沒有限制
--acks ACKS 每次推送訊息的ack值,預設是-1
--producer.config CONFIG_FILE 指定producer的配置檔案
--value-prefix VALUE-PREFIX 推送的訊息預設是遞增的數字,我們可以在這些訊息前面加上指定的字首。這個字首好像也必須是數字
demo:
# --max-messages 10 總共推送10條
# 每秒推送2條
kafka-verifiable-producer --broker-list 127.0.0.1:9092 --topic test --max-messages 10 --throughput 2
輸出:
{"timestamp":1544327879247,"name":"startup_complete"}
{"timestamp":1544327879413,"name":"producer_send_success","key":null,"value":"0","offset":91029,"partition":0,"topic":"test"}
{"timestamp":1544327879415,"name":"producer_send_success","key":null,"value":"1","offset":91030,"partition":0,"topic":"test"}
{"timestamp":1544327879904,"name":"producer_send_success","key":null,"value":"2","offset":91031,"partition":0,"topic":"test"}
{"timestamp":1544327880406,"name":"producer_send_success","key":null,"value":"3","offset":91032,"partition":0,"topic":"test"}
{"timestamp":1544327880913,"name":"producer_send_success","key":null,"value":"4","offset":91033,"partition":0,"topic":"test"}
{"timestamp":1544327881414,"name":"producer_send_success","key":null,"value":"5","offset":91034,"partition":0,"topic":"test"}
{"timestamp":1544327881918,"name":"producer_send_success","key":null,"value":"6","offset":91035,"partition":0,"topic":"test"}
{"timestamp":1544327882422,"name":"producer_send_success","key":null,"value":"7","offset":91036,"partition":0,"topic":"test"}
{"timestamp":1544327882924,"name":"producer_send_success","key":null,"value":"8","offset":91037,"partition":0,"topic":"test"}
{"timestamp":1544327883430,"name":"producer_send_success","key":null,"value":"9","offset":91038,"partition":0,"topic":"test"}
{"timestamp":1544327883942,"name":"shutdown_complete"}
{"timestamp":1544327883943,"name":"tool_data","sent":10,"acked":10,"target_throughput":2,"avg_throughput":2.1294718909710393}
4. 使用kafka-replay-log-producer進行topic之間的訊息複製
使用kafka-replay-log-producer可以將一個topic的訊息複製到另外一個topic上。它的流程是先從topic拉取訊息,然後推送到另一個topic。
支援的引數:
--broker-list <String: hostname:port> 指定broker的地址
--inputtopic <String: input-topic> 要讀取的topic名稱
--messages <Integer: count> 要複製的訊息數量,預設是-1,也就是全部
--outputtopic <String: output-topic> 要複製到哪個topic
--property <String: producer properties> 可以指定producer的一些引數
--reporting-interval <Integer: size> 彙報進度的頻率,預設是5000彙報一次
--sync 是否開啟同步模式
--threads <Integer: threads> 複製訊息的執行緒數
--zookeeper <String: zookeeper url> zk地址
demo:
# 把topic-test的訊息複製給topic-aaaa
# --messages 表示只複製前50條
kafka-replay-log-producer --broker-list 127.0.0.1:9092 --zookeeper 127.0.0.1:2181/kafka --inputtopic test --outputtopic aaaa --messages 50
二、consumer相關命令
1. kafka-console-consumer 消費訊息
使用kafka-console-consumer可以消費指定topic的訊息。底層也是使用KafkaConsumer進行消費的。相關消費原理可以看我的這兩篇部落格:
# 指定消費topic-test的訊息
# --from-beginning 表示如果之前沒有過消費記錄,就從第一條開始消費
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --group hahaeh --topic test --from-beginning
kafka-console-consumer 還支援配置一些其他的引數,使用者可以自行通過 —help 引數檢視。
2. 使用 kafka-consumer-perf-test 進行consumer的基準測試
和producer一樣,kafka也會consumer提供了一個命令來進行基準測試。
# --fetch-size 表示一次請求拉取多少條資料
# --messages 表示總共要拉取多少條資料
kafka-consumer-perf-test --broker-list 127.0.0.1:9092 --fetch-size 200 --group oka --topic test --messages 200
輸出:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2018-12-09 13:37:19:118, 2018-12-09 13:37:20:393, 0.0003, 0.0002, 162, 127.0588, 21, 1254, 0.0002, 129.1866
kafka-consumer-perf-test還支援其他引數:
--consumer.config <String: config file> 指定Consumer使用的配置檔案
--date-format 定義輸出的日誌格式
--from-latest 如果之前沒有消費記錄,是否從之前消費過的地方開始消費
--num-fetch-threads 拉取訊息的執行緒數
--threads 處理訊息的執行緒數
--reporting-interval 多久輸出一次執行過程資訊
3. 使用kafka-verifiable-consumer批量拉取訊息
kafka-verifiable-consumer可以批量的拉取訊息,其實和kafka-console-consumer命令差不多。不過使用kafka-verifiable-consumer消費訊息輸出的內容更豐富,還包括offset等資訊,並且可以設定只讀取幾條訊息等。kafka-console-consumer是有多少讀多少。
# --max-messages 5 表示只拉取5條
# --verbose 表示輸出每一條訊息的內容
kafka-verifiable-consumer --broker-list 127.0.0.1:9092 --max-messages 5 --group-id hello --topic test --verbose
輸出:
{"timestamp":1544335112709,"name":"startup_complete"}
{"timestamp":1544335112862,"name":"partitions_revoked","partitions":[]}
{"timestamp":1544335112883,"name":"partitions_assigned","partitions":[{"topic":"test","partition":0}]}
{"timestamp":1544335112919,"name":"record_data","key":null,"value":"90218","topic":"test","partition":0,"offset":90877}
{"timestamp":1544335112920,"name":"record_data","key":null,"value":"90219","topic":"test","partition":0,"offset":90878}
{"timestamp":1544335112920,"name":"record_data","key":null,"value":"0","topic":"test","partition":0,"offset":90879}
{"timestamp":1544335112921,"name":"record_data","key":null,"value":"1","topic":"test","partition":0,"offset":90880}
{"timestamp":1544335112921,"name":"record_data","key":null,"value":"2","topic":"test","partition":0,"offset":90881}
{"timestamp":1544335112921,"name":"records_consumed","count":162,"partitions":[{"topic":"test","partition":0,"count":5,"minOffset":90877,"maxOffset":90881}]}
{"timestamp":1544335112928,"name":"offsets_committed","offsets":[{"topic":"test","partition":0,"offset":90882}],"success":true}
{"timestamp":1544335112943,"name":"shutdown_complete"}
kafka-verifiable-consumer命令還支援以下引數:
--session-timeout consumer的超時時間
--enable-autocommit 是否開啟自動offset提交,預設是false
--reset-policy 當以前沒有消費記錄時,選擇要拉取offset的策略,可以是'earliest', 'latest','none'。預設是earliest
--assignment-strategy consumer分配分割槽策略,預設是RoundRobinAssignor
--consumer.config 指定consumer的配置
4. 使用kafka-consumer-groups命令管理ConsumerGroup
➀、列出所有的ConsumerGroup(新舊版本api區別)
由於kafka consumer api有新版本和舊版本的區別,因此使用kafka-consumer-groups進行group的管理時,內部使用的機制也不一樣。如果我們使用—zookeeper
來連線叢集,則使用的是舊版本的consumer group管理規則,也就是ConsumerGroup的一些元資料是儲存在zk上的。如果使用--bootstrap-server
來連線,則是面向新版本的consumer group規則。
列出使用舊版本的所有consumer group
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --list
列出新版本的所有consumer group
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list
➁、刪除ConsumerGroup
刪除指定的group
# 刪除 helo和hahah 這兩個group
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --delete --group helo --group hahah
刪除指定group的指定topic的消費記錄(topic級別的刪除僅在舊版本api中支援)
# 舊版本api 必須指定zk地址
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --delete --group helo --topic test
刪除指定topic在所有group中的消費記錄(topic級別的刪除僅在舊版本api中支援)
# 舊版本api 必須指定zk地址
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --delete --topic test
➂、列出ConsumerGroup詳情
通過—describe可以從不同維度觀察group的資訊。
檢視group的offset消費記錄
# --offset是--describe的預設選項,可以不傳
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --offset
輸出
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 796670 796676 6 consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13 consumer-1
檢視group的member資訊
# --members 表示輸出member資訊
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --members
輸出
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13 consumer-1 1
檢視group的狀態
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --state
輸出:
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
172.20.16.13:9092 (0) range Stable 1
➃、重置group的消費記錄
當選擇重置消費記錄操作時,目標Group的狀態一定不能是活躍的。也就是該group中不能有consumer在消費。
通過 --reset-offsets
可以重置指定group的消費記錄。和--reset-offsets
搭配的有兩個選項,--dry-run
和--execute
,預設是--dry-run
。
dry-run 模式
當執行在--dry-run
模式下,重置操作不會真正的執行,只會預演重置offset的結果。該模式也是為了讓使用者謹慎的操作,否則直接重置消費記錄會造成各個consumer訊息讀取的異常。
# --shift-by -1 表示將消費的offset重置成當前消費的offset-1
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --dry-run
輸出
TOPIC PARTITION NEW-OFFSET
test 0 797054
此時如果去查詢該group的消費offset,會發現該group的消費offset其實還是797055,並沒有發生改變。
—execute 模式
通過--execute
引數可以直接執行重置操作。
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --execute
重置offset的幾種策略
該命令提供了多種offset重置策略給我們選擇如何重置offset
--to-current 直接重置offset到當前的offset,也就是LOE
--to-datetime <String: datetime> 重置offset到指定時間的offset處
--to-earliest 重置offset到最開始的那條offset
--to-offset <Long: offset> 重置offset到目標的offset
--shift-by <Long:n> 根據當前的offset進行重置,n可以是正負數
--from-file <String: path to CSV file> 通過外部的csv檔案描述來進行重置
Demo:
# 將group mytest的test 這個topic的消費offset重置到666
# 注意如果topic分割槽中的最小offset比666還大的話,就會直接使用該最小offset作為消費offset
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-offset 666
# 重置到最早的那條offset
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-earliest
我們再看下如何使用--from-file
來重置offset。首先先編輯一個檔案 reset.csv
test,0,796000
3列分別是topicName,partition,offset。最後輸入重置命令
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --group mytest --execute --from-file reset.csv
三、replica資料一致性校驗
通過kafka-replica-verification命令可以檢查指定topic的各個partition的replic的資料是否一致。
kafka-replica-verification --broker-list 127.0.0.1:9092
預設是檢查全部topic,可以通過指定topic-white-list
來指定只檢查一些topic。
replica一致性檢查主要是根據partition的HW來檢查的,大概原理是在所有的broker都開啟一個fetcher,然後拉取資料做檢查各個replica的資料是否一致。因此,進行該檢查時要保證所有的broker都線上,否則該工具會一直阻塞直到broker全部啟動。