kafka指定分割槽消費
阿新 • • 發佈:2018-12-17
public class DConsumer { public static void main(String[] args) { Properties prop = new Properties(); prop.put("bootstrap.servers","node:9092"); prop.put("group.id","test8"); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //如果是之前存在的group.id Consumer consumer = new KafkaConsumer(prop); TopicPartition p = new TopicPartition("test2",2); // 指定消費topic的那個分割槽 consumer.assign(Arrays.asList(p)); // 指定從topic的分割槽的某個offset開始消費 // consumer.seekToBeginning(Arrays.asList(p)); consumer.seek(p,5); // consumer.subscribe(Arrays.asList("test2")); //如果是之前不存在的group.id // Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>(); // hashMaps.put(new TopicPartition("test2", 0), new OffsetAndMetadata(0)); // consumer.commitSync(hashMaps); // consumer.subscribe(Arrays.asList("test2")); while (true) { ConsumerRecords<String, String> c = consumer.poll(100); for(ConsumerRecord<String, String> c1: c) { System.out.println("Key: " + c1.key() + " Value: " + c1.value() + " Offset: " + c1.offset() + " Partitions: " + c1.partition()); } } } }
topic分割槽為 併發操作提供了優勢,訂閱消費關係如圖: