kafka.storage為kafka時如修改設定使用者組的offset
阿新 • • 發佈:2019-02-14
因為offset資訊儲存在kafka的一個名為__consumer_offsets的topic中,沒辦法像zookeeper那樣直接修改。
經過嘗試用kafka-python,失敗,正常執行完offset沒變,也不是zookeeper儲存模式。
下面是最後用的方法:
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import cn.xxx.kafka.Message; public class KafkaTool { private static String seek(String []args){ String topic = args[2]; int partition = Integer.parseInt(args[3]); int offset = Integer.parseInt(args[4]); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.___.kafka.message.MessageValueDeserializer"); KafkaConsumer<String, Message> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props); //consumer.subscribe(Arrays.asList(topic)); //"deviceInfoTopic" TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.assign(Arrays.asList(topicPartition)); consumer.seek(new TopicPartition(topic, partition), offset); consumer.close(); return "SUCC"; } public static void main(String[] args) { System.out.println(args[1]); if("seek".equals(args[1])){ System.out.println(seek(args)); } } }
如:將group1組topicname topic的0分割槽offset設定為100001,執行:
java -cp xxx.jar:yyy.jar KafkaTool seek topicname 0 100001
列印SUCC表示執行成功。
執行驗證結果:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost --group group1 --topic topicname
如果是kafka.storage是zookeeper,見 http://www.cnblogs.com/hd-zg/p/5831219.html