1. 程式人生 > >kafka.storage為kafka時如修改設定使用者組的offset

kafka.storage為kafka時如修改設定使用者組的offset

因為offset資訊儲存在kafka的一個名為__consumer_offsets的topic中,沒辦法像zookeeper那樣直接修改。

經過嘗試用kafka-python,失敗,正常執行完offset沒變,也不是zookeeper儲存模式。

下面是最後用的方法:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import cn.xxx.kafka.Message;

public class KafkaTool {

	private static String seek(String []args){
		String topic = args[2];
		int partition = Integer.parseInt(args[3]);
		int offset = Integer.parseInt(args[4]);
		
		Properties props = new Properties();
		props.put("bootstrap.servers", "localhost:9092");
		props.put("group.id", "group1");
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "com.___.kafka.message.MessageValueDeserializer");
		KafkaConsumer<String, Message> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
		//consumer.subscribe(Arrays.asList(topic));	//"deviceInfoTopic"
		TopicPartition topicPartition = new TopicPartition(topic, partition);
        consumer.assign(Arrays.asList(topicPartition));
        
		consumer.seek(new TopicPartition(topic, partition), offset);
		consumer.close();
		return "SUCC";
	}
	
	public static void main(String[] args) {
		System.out.println(args[1]);
		if("seek".equals(args[1])){
			System.out.println(seek(args));
		}

	}
}


如:將group1組topicname topic的0分割槽offset設定為100001,執行:

java -cp xxx.jar:yyy.jar KafkaTool seek topicname 0 100001 

列印SUCC表示執行成功。

執行驗證結果:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost --group group1 --topic topicname

如果是kafka.storage是zookeeper,見 http://www.cnblogs.com/hd-zg/p/5831219.html