1. 程式人生 > >kafka 常用運維命令介紹(二)

kafka 常用運維命令介紹(二)

文章目錄

一、producer 相關命令

1. kafka-console-producer 生產訊息

使用kafka-console-producer我們可以快速往某個topic推送訊息。kafka-console-producer使用的也是KafkaProducer類進行訊息的推送,因此KafkaProducer支援的引數kafka-console-producer都可以配置。

有關KafkaProducer的相關原理可以看我的這篇部落格:

https://blog.csdn.net/u013332124/article/details/81321942

# 執行下面這條命令後會進入producer的互動介面,輸入字串就會將訊息推送到kafka叢集
kafka-console-producer --broker-list 127.0.0.1:9092 --topic test

# 推送10條訊息 分別是1、2、3、...、10
seq 10 | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb

# 推送hello world 到kafka叢集
echo "nihao world" | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb

2. 使用 kafka-producer-perf-test 進行producer的基準測試

我們要修改某個配置時,經常想知道這個配置的修改對kafka的效能會有哪些影響,這時候就可以來個基準測試來衡量配置修改對producer效能的影響。kafka官方就提供了這樣一個工具,讓我們很方面的對produer的效能進行測試。

下面是kafka-producer-perf-test支援的一些引數

  --topic TOPIC          指定topic
  --num-records NUM-RECORDS 要推送多少條資料
  --payload-delimiter PAYLOAD-DELIMITER 當使用payload檔案生成資料時,指定每條訊息的之間的分割符,預設是換行符
  --throughput THROUGHPUT 推送訊息時的吞吐量,單位是 messages/sec。必須指定
  --producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] 指定producer的一些配置
  --producer.config CONFIG-FILE 直接指定配置檔案
  --print-metrics       是否要在最後輸出度量指標,預設是false
# 生成資料的方式有兩種,一種是我們指定一個訊息大小,該工具會隨機生成一個指定大小的字串推送,一個是我們指定一個檔案,工具會從該檔案中隨機選取一條訊息推送
# 下面兩種方式只能選擇一種
  --record-size RECORD-SIZE 指定每條訊息的大小,大小是bytes
  --payload-file PAYLOAD-FILE 指定檔案存放目錄

下面我們測試使用producer一次推送100條資料

# 通過 --producer-props指定要連線的broker地址
# --num-records  指定一共要推送100條
#  --throughput 表示吞吐量,限制每秒20
# --record-size 表示每條訊息的大小是20B
kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --num-records 100 --throughput 10 --topic test --record-size 20

最後輸出報告:

52 records sent, 10.4 records/sec (0.00 MB/sec), 5.2 ms avg latency, 137.0 max latency.
100 records sent, 9.993005 records/sec (0.00 MB/sec), 3.78 ms avg latency, 137.00 ms max latency, 2 ms 50th, 4 ms 95th, 137 ms 99th, 137 ms 99.9th.

我們可以編輯一個payload.txt,輸入

hello
world
producer
perf
test

接著使用該payload.txt進行測試

# --payload-file 指定檔案地址
kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --payload-file payload.txt --num-records 100 --throughput 100 --topic test

該工具在執行時,會讀取payload.txt的內容,然後根據--payload-delimiter將文字分成一條條訊息,接著測試的時候會隨機發送這些訊息。

3. 使用 kafka-verifiable-producer 批量推送訊息

kafka提供了kafka-verifiable-producer工具用於快速的推送一批訊息到producer,並且可以打印出各條推送訊息的元資訊。推送的訊息是從0開始不斷往上遞增。

支援引數

  --topic TOPIC          指定topic
  --broker-list HOST1:PORT1[,HOST2:PORT2[...]] 指定kafka broker地址
  --max-messages MAX-MESSAGES 一共要推送多少條,預設為-1,-1表示一直推送到程序關閉位置
  --throughput THROUGHPUT 推送訊息時的吞吐量,單位messages/sec。預設為-1,表示沒有限制
  --acks ACKS            每次推送訊息的ack值,預設是-1
  --producer.config CONFIG_FILE 指定producer的配置檔案
  --value-prefix VALUE-PREFIX 推送的訊息預設是遞增的數字,我們可以在這些訊息前面加上指定的字首。這個字首好像也必須是數字

demo:

# --max-messages 10 總共推送10條
# 每秒推送2條
kafka-verifiable-producer --broker-list 127.0.0.1:9092 --topic test --max-messages 10 --throughput 2

輸出:

{"timestamp":1544327879247,"name":"startup_complete"}
{"timestamp":1544327879413,"name":"producer_send_success","key":null,"value":"0","offset":91029,"partition":0,"topic":"test"}
{"timestamp":1544327879415,"name":"producer_send_success","key":null,"value":"1","offset":91030,"partition":0,"topic":"test"}
{"timestamp":1544327879904,"name":"producer_send_success","key":null,"value":"2","offset":91031,"partition":0,"topic":"test"}
{"timestamp":1544327880406,"name":"producer_send_success","key":null,"value":"3","offset":91032,"partition":0,"topic":"test"}
{"timestamp":1544327880913,"name":"producer_send_success","key":null,"value":"4","offset":91033,"partition":0,"topic":"test"}
{"timestamp":1544327881414,"name":"producer_send_success","key":null,"value":"5","offset":91034,"partition":0,"topic":"test"}
{"timestamp":1544327881918,"name":"producer_send_success","key":null,"value":"6","offset":91035,"partition":0,"topic":"test"}
{"timestamp":1544327882422,"name":"producer_send_success","key":null,"value":"7","offset":91036,"partition":0,"topic":"test"}
{"timestamp":1544327882924,"name":"producer_send_success","key":null,"value":"8","offset":91037,"partition":0,"topic":"test"}
{"timestamp":1544327883430,"name":"producer_send_success","key":null,"value":"9","offset":91038,"partition":0,"topic":"test"}
{"timestamp":1544327883942,"name":"shutdown_complete"}
{"timestamp":1544327883943,"name":"tool_data","sent":10,"acked":10,"target_throughput":2,"avg_throughput":2.1294718909710393}

4. 使用kafka-replay-log-producer進行topic之間的訊息複製

使用kafka-replay-log-producer可以將一個topic的訊息複製到另外一個topic上。它的流程是先從topic拉取訊息,然後推送到另一個topic。

支援的引數:

--broker-list <String: hostname:port>  指定broker的地址
--inputtopic <String: input-topic>     要讀取的topic名稱
--messages <Integer: count>            要複製的訊息數量,預設是-1,也就是全部
--outputtopic <String: output-topic>   要複製到哪個topic
--property <String: producer properties>     可以指定producer的一些引數                       
--reporting-interval <Integer: size>   彙報進度的頻率,預設是5000彙報一次
--sync                                 是否開啟同步模式
--threads <Integer: threads>           複製訊息的執行緒數
--zookeeper <String: zookeeper url>    zk地址

demo:

# 把topic-test的訊息複製給topic-aaaa
# --messages 表示只複製前50條
kafka-replay-log-producer --broker-list 127.0.0.1:9092 --zookeeper 127.0.0.1:2181/kafka --inputtopic test --outputtopic aaaa --messages 50

二、consumer相關命令

1. kafka-console-consumer 消費訊息

使用kafka-console-consumer可以消費指定topic的訊息。底層也是使用KafkaConsumer進行消費的。相關消費原理可以看我的這兩篇部落格:

Consumer 加入&離開 Group詳解(九)

Consumer 拉取日誌流程詳解(十)

# 指定消費topic-test的訊息
# --from-beginning 表示如果之前沒有過消費記錄,就從第一條開始消費
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --group hahaeh --topic test --from-beginning

kafka-console-consumer 還支援配置一些其他的引數,使用者可以自行通過 —help 引數檢視。

2. 使用 kafka-consumer-perf-test 進行consumer的基準測試

和producer一樣,kafka也會consumer提供了一個命令來進行基準測試。

# --fetch-size 表示一次請求拉取多少條資料
# --messages 表示總共要拉取多少條資料
kafka-consumer-perf-test --broker-list 127.0.0.1:9092 --fetch-size 200 --group oka --topic test --messages 200

輸出:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2018-12-09 13:37:19:118, 2018-12-09 13:37:20:393, 0.0003, 0.0002, 162, 127.0588, 21, 1254, 0.0002, 129.1866

kafka-consumer-perf-test還支援其他引數:

--consumer.config <String: config file>   指定Consumer使用的配置檔案
--date-format 定義輸出的日誌格式
--from-latest 如果之前沒有消費記錄,是否從之前消費過的地方開始消費
--num-fetch-threads 拉取訊息的執行緒數
--threads 處理訊息的執行緒數
--reporting-interval  多久輸出一次執行過程資訊

3. 使用kafka-verifiable-consumer批量拉取訊息

kafka-verifiable-consumer可以批量的拉取訊息,其實和kafka-console-consumer命令差不多。不過使用kafka-verifiable-consumer消費訊息輸出的內容更豐富,還包括offset等資訊,並且可以設定只讀取幾條訊息等。kafka-console-consumer是有多少讀多少。

# --max-messages 5 表示只拉取5條
# --verbose 表示輸出每一條訊息的內容
kafka-verifiable-consumer --broker-list 127.0.0.1:9092 --max-messages 5 --group-id hello --topic test --verbose

輸出:

{"timestamp":1544335112709,"name":"startup_complete"}
{"timestamp":1544335112862,"name":"partitions_revoked","partitions":[]}
{"timestamp":1544335112883,"name":"partitions_assigned","partitions":[{"topic":"test","partition":0}]}
{"timestamp":1544335112919,"name":"record_data","key":null,"value":"90218","topic":"test","partition":0,"offset":90877}
{"timestamp":1544335112920,"name":"record_data","key":null,"value":"90219","topic":"test","partition":0,"offset":90878}
{"timestamp":1544335112920,"name":"record_data","key":null,"value":"0","topic":"test","partition":0,"offset":90879}
{"timestamp":1544335112921,"name":"record_data","key":null,"value":"1","topic":"test","partition":0,"offset":90880}
{"timestamp":1544335112921,"name":"record_data","key":null,"value":"2","topic":"test","partition":0,"offset":90881}
{"timestamp":1544335112921,"name":"records_consumed","count":162,"partitions":[{"topic":"test","partition":0,"count":5,"minOffset":90877,"maxOffset":90881}]}
{"timestamp":1544335112928,"name":"offsets_committed","offsets":[{"topic":"test","partition":0,"offset":90882}],"success":true}
{"timestamp":1544335112943,"name":"shutdown_complete"}

kafka-verifiable-consumer命令還支援以下引數:

--session-timeout consumer的超時時間
--enable-autocommit 是否開啟自動offset提交,預設是false
--reset-policy  當以前沒有消費記錄時,選擇要拉取offset的策略,可以是'earliest', 'latest','none'。預設是earliest
--assignment-strategy  consumer分配分割槽策略,預設是RoundRobinAssignor
--consumer.config 指定consumer的配置

4. 使用kafka-consumer-groups命令管理ConsumerGroup

➀、列出所有的ConsumerGroup(新舊版本api區別)

由於kafka consumer api有新版本和舊版本的區別,因此使用kafka-consumer-groups進行group的管理時,內部使用的機制也不一樣。如果我們使用—zookeeper來連線叢集,則使用的是舊版本的consumer group管理規則,也就是ConsumerGroup的一些元資料是儲存在zk上的。如果使用--bootstrap-server來連線,則是面向新版本的consumer group規則。

列出使用舊版本的所有consumer group

kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --list

列出新版本的所有consumer group

kafka-consumer-groups --bootstrap-server 127.0.0.1:9092  --list

➁、刪除ConsumerGroup

刪除指定的group

# 刪除 helo和hahah 這兩個group
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092  --delete --group helo --group hahah

刪除指定group的指定topic的消費記錄(topic級別的刪除僅在舊版本api中支援)

# 舊版本api 必須指定zk地址
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka  --delete --group helo --topic test

刪除指定topic在所有group中的消費記錄(topic級別的刪除僅在舊版本api中支援)

# 舊版本api 必須指定zk地址
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka  --delete --topic test

➂、列出ConsumerGroup詳情

通過—describe可以從不同維度觀察group的資訊。

檢視group的offset消費記錄
# --offset是--describe的預設選項,可以不傳
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --offset

輸出

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test            0          796670          796676          6               consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13   consumer-1
檢視group的member資訊
# --members 表示輸出member資訊
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --members

輸出

CONSUMER-ID                                     HOST            CLIENT-ID       #PARTITIONS
consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13   consumer-1      1
檢視group的狀態
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --state

輸出:

COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
172.20.16.13:9092 (0)     range                     Stable               1

➃、重置group的消費記錄

當選擇重置消費記錄操作時,目標Group的狀態一定不能是活躍的。也就是該group中不能有consumer在消費。

通過 --reset-offsets 可以重置指定group的消費記錄。和--reset-offsets搭配的有兩個選項,--dry-run--execute,預設是--dry-run

dry-run 模式

當執行在--dry-run模式下,重置操作不會真正的執行,只會預演重置offset的結果。該模式也是為了讓使用者謹慎的操作,否則直接重置消費記錄會造成各個consumer訊息讀取的異常。

# --shift-by -1 表示將消費的offset重置成當前消費的offset-1
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --dry-run

輸出

TOPIC                          PARTITION  NEW-OFFSET
test                           0          797054

此時如果去查詢該group的消費offset,會發現該group的消費offset其實還是797055,並沒有發生改變。

—execute 模式

通過--execute引數可以直接執行重置操作。

kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --execute
重置offset的幾種策略

該命令提供了多種offset重置策略給我們選擇如何重置offset

--to-current 直接重置offset到當前的offset,也就是LOE
--to-datetime <String: datetime>  重置offset到指定時間的offset處
--to-earliest  重置offset到最開始的那條offset
--to-offset <Long: offset> 重置offset到目標的offset
--shift-by <Long:n> 根據當前的offset進行重置,n可以是正負數
--from-file <String: path to CSV file> 通過外部的csv檔案描述來進行重置

Demo:

# 將group mytest的test 這個topic的消費offset重置到666
# 注意如果topic分割槽中的最小offset比666還大的話,就會直接使用該最小offset作為消費offset
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-offset 666
# 重置到最早的那條offset
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-earliest

我們再看下如何使用--from-file來重置offset。首先先編輯一個檔案 reset.csv

test,0,796000

3列分別是topicName,partition,offset。最後輸入重置命令

kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --group mytest --execute --from-file reset.csv

三、replica資料一致性校驗

通過kafka-replica-verification命令可以檢查指定topic的各個partition的replic的資料是否一致。

kafka-replica-verification --broker-list 127.0.0.1:9092

預設是檢查全部topic,可以通過指定topic-white-list來指定只檢查一些topic。

replica一致性檢查主要是根據partition的HW來檢查的,大概原理是在所有的broker都開啟一個fetcher,然後拉取資料做檢查各個replica的資料是否一致。因此,進行該檢查時要保證所有的broker都線上,否則該工具會一直阻塞直到broker全部啟動。