1. 程式人生 > >Kafka消息重新發送

Kafka消息重新發送

local pic 重置 默認 ets 分區 str 需要 sum

Kafka消息重新發送

1、 使用kafka消息隊列做消息的發布、訂閱,如果consumer端消費出問題,導致數據並沒有消費,此時不需要擔心,數據並不會立刻丟失,kafka會把數據在服務器的磁盤上默認存儲7天,或者自己指定有兩種方式:1)指定時間,log.retention.hours=1682)指定大小,log.segment.bytes=1073741824。此時就可以通過重置某個topicoffset來是消息重新發送,進行消費

2、 查看topic的offset的範圍

1)使用下面的命令可以查看topicuserlogbrokerspark:9092offset

的最小值:

#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -2

2)offset的最大值:

#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -1

3、 設置consumer group的offset

1)啟動zookeeper,如果使用的是kafka內置的zookeeper,直接啟動bin目錄下的zookeeper-shell.sh,進行登錄:

#./zookeeper-shell.sh localhost:2181

通過如下命令,來重置offset值,比如topic:userlog,group:userlogs,partition:0 ,offset:2181

set /consumers/userlogs/offsets/userlog/0 1288

如果有多個partition都執行上輸的命令,並將0換為相對應的分區號就可以了。

###如果創建分區的時候設置了zk的根目錄,如localhost:2181/kafka

則重置的命令為:

set /kafka/consumers/userlogs/offsets/userlog/0 1288

2)如果使用單獨安裝的zookeeper,直接使用bin目錄下的ZKCli.sh登錄後,執行上述命令即可。

4、 設置完成後需要重啟相關的服務,就可以從設置offset的地方開始消費。

重啟服務:consumer服務。

Kafka消息重新發送