1. 程式人生 > 其它 >kafka 分組消費topic_Kafka實戰—常見運維操作

kafka 分組消費topic_Kafka實戰—常見運維操作

技術標籤:kafka 分組消費topic

bcf2090ec93d05fe4318ecbabc26e736.png

十萬個為什麼

  • 叢集操作

  • Topic操作

  • 生產者操作

  • 消費者操作

e97f46144364c83ca3d46dc50b7a1119.png

叢集操作

951ad5f011ca502e3608103015a53ad0.png

1.啟動叢集

kafka-server-start.sh-daemon/usr/local/kafka/config/server.properties

2.停止叢集

kafka-server-stop.sh/usr/local/kafka/config/server.properties
e97f46144364c83ca3d46dc50b7a1119.png

Topic操作

951ad5f011ca502e3608103015a53ad0.png

1.建立Topic

kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 3 --topic wolf

注意:副本數不可大於broker的數量。

Replication factor: 6 larger than available brokers: 3.

2.刪除Topic

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic wolf

注意:此處的刪除操作只是將該topic標記為刪除,並未真正的刪除,且要建立一個同名的topic也不會成功。

Error while executing topic command : Topic 'wolf' already exists

如果打算刪除重新建立,可以先修改 kafka/config/server.properties ,

在檔案的最後加入配置 delete.topic.enable=true

則此時執行刪除命令將直接刪除

3.修改Topic

(1)增加分割槽數

kafka-topics.sh --bootstrap-server 127.0.0.1:2181 --alter --topic my_topic_name --partitions 40

(2)修改配置

kafka-configs.sh --bootstrap-server 127.0.0.1:9092--entity-type topics --entity-name my_topic_name --alter --add-config x=y

(3)修改過期時間

全域性配置【server.properties 】

log.retention.hours=72log.cleanup.policy=delete

單獨配置

kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --alter --entity-name wolf --entity-type topics --add-config retention.ms=86400000

檢視設定:

kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --describe --entity-name wolf --entity-type topics

(4)修改副本因子

首先建立一個json檔案,指定相關的配置

vim custom_replication.json{  "partitions": [{      "topic": "wolf",      "partition": 0,      "replicas": [1]    },    {      "topic": "wolf",      "partition": 1,      "replicas": [4]    },    {      "topic": "wolf",      "partition": 2,      "replicas":[2]    }  ],  "version": 1}

注意:

partition:分片的編號

replicas:指定分片所分佈broker的,broker id列表

執行修改副本

kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file /Users/zhaoqiang/Downloads/aa.json --execute //以不超多500M/s的速度進行資料遷移【此處的單位是B/s】kafka-reassign-partitions.sh --bootstrap-server 127.0.0.1:9092 --reassignment-json-file /Users/zhaoqiang/Downloads/aa.json --execute --throttle 50000000//--verify 用於分割槽分配的狀態

4.查詢Topic

(0)羅列所有Topic

kafka-topics.sh --list --zookeeper 127.0.0.1:2181

(1)檢視具體topic詳情【其中的數字是brokerId】

kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test

(2)列出與叢集的預設配置不同的topic

kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topics-with-overrides

(3)列出包含不同步副本的topic

kafka-topics.sh --zookeeper 127.0.0.1:2181  --describe --under-replicated-partitions

(4)列出leader不可用的副本

kafka-topics.sh --zookeeper 127.0.0.1:2181  --describe --unavailable-partitions

(5)修改topic分割槽數

kafka-topics.sh --alter --zookeeper 127.0.0.1:2181  --partitions 4 --topic wolf

(6)檢視選舉失敗的Topic 分割槽

kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe|grep "Leader: -1"
e97f46144364c83ca3d46dc50b7a1119.png

生產者操作

951ad5f011ca502e3608103015a53ad0.png

1、生產訊息

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic fusion_center_monitor_metric
e97f46144364c83ca3d46dc50b7a1119.png

消費者操作

951ad5f011ca502e3608103015a53ad0.png
注意:舊版本的消費者組資訊儲存在zookeeper上【--zookeeper】     新版本的消費者組資訊儲存在broker上【--bootstrap-server】

1、檢視訊息

(1)檢視指定消費者分組消費過指定topic的訊息【實時資料】

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic fox

(2)檢視指定消費者分組消費過指定topic的訊息【從第一條資料開始】

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic log_mryx_intelligent_promotion --from-beginning --group wolf

2、檢視訊息進度

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --describe --all-groups

(3)將消費者組的偏移量匯出到 offsets.txt

kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect 127.0.0.1:2181--group gid --output-file offsets.txt

3、消費組操作

(1)檢視所有的消費者組

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

(2)消費者組描述

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group gid

(3)消費者組中所有活躍成員的列表

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group gid --memberskafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group gid --members --all-groups

(4)消費者組中所有活躍成員及成員所對應的分割槽列表

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups --members --verbose

(5)消費者組狀態

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups --state

(6)刪除消費者組

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group gid

(7)重置消費者組偏移量

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
--reset-offsets    需指定Topic --all-topics或--topic    執行選項  --(預設)以顯示要重置的偏移量。  --execute:執行--reset-offsets過程。  --export:將結果匯出為CSV格式。  執行方案--to-datetime :將偏移量重置為與datetime的偏移量。格式:“ YYYY-MM-DDTHH:mm:SS.sss”--to-earliest:將偏移量重置為最早的偏移量。--to-latest:將偏移量重置為最新偏移量。--shift-by :重置偏移,將當前偏移偏移“ n”,其中“ n”可以為正或負。--from-file:將偏移量重置為CSV檔案中定義的值。--to-current:將偏移量重置為當前偏移量。--by-duration :將偏移量重置為從當前時間戳記的持續時間偏移量。格式:“ PnDTnHnMnS”--to-offset:將偏移量重置為特定偏移量。請注意,超出範圍的偏移量將調整為可用的偏移量結束。例如,如果偏移量結束為10,偏移量請求為15,則實際上將選擇偏移量為10。