Kafka消息重新發送
Kafka消息重新發送
1、 使用kafka消息隊列做消息的發布、訂閱,如果consumer端消費出問題,導致數據並沒有消費,此時不需要擔心,數據並不會立刻丟失,kafka會把數據在服務器的磁盤上默認存儲7天,或者自己指定有兩種方式:1)指定時間,log.retention.hours=168;2)指定大小,log.segment.bytes=1073741824。此時就可以通過重置某個topic的offset來是消息重新發送,進行消費
2、 查看topic的offset的範圍
1)使用下面的命令可以查看topic為userlog,broker為spark:9092的offset
#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -2
2)offset的最大值:
#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -1
3、 設置consumer group的offset
1)啟動zookeeper,如果使用的是kafka內置的zookeeper,直接啟動bin目錄下的zookeeper-shell.sh,進行登錄:
#./zookeeper-shell.sh localhost:2181
通過如下命令,來重置offset值,比如topic:userlog,group:userlogs,partition:0 ,offset:2181
set /consumers/userlogs/offsets/userlog/0 1288
如果有多個partition都執行上輸的命令,並將0換為相對應的分區號就可以了。
###如果創建分區的時候設置了zk的根目錄,如localhost:2181/kafka
則重置的命令為:
set /kafka/consumers/userlogs/offsets/userlog/0 1288
2)如果使用單獨安裝的zookeeper,直接使用bin目錄下的ZKCli.sh登錄後,執行上述命令即可。
4、 設置完成後需要重啟相關的服務,就可以從設置offset的地方開始消費。
重啟服務:consumer服務。
Kafka消息重新發送