Kafka消費資料的幾種方式
阿新 • • 發佈:2018-11-01
Consumer手動指定偏移量消費:
=================================================
1、指定多主題消費
consumer.subscribe(Arrays.asList("t4","t5"));
2、指定分割槽消費
consumer.assign(list);
3、手動修改偏移量
consumer.commitSync(); //提交當前消費偏移量
consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>) //提交指定偏移量
consumer.assign(Arrays.asList(tp));
4、seek,修改偏移量搜尋指標,順序讀取資料
consumer.assign(Arrays.asList(tp));
consumer.seek(tp,0);
程式碼如下:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.*; public class NewConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers","s102:9092,s103:9092,s104:9092"); props.put("group.id", "g3"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "100"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //通過subscribe方法,指定多主題消費 //consumer.subscribe(Arrays.asList("t4","t5")); //指定分割槽消費 // ArrayList<TopicPartition> list = new ArrayList<TopicPartition>(); // TopicPartition tp = new TopicPartition("t1", 0); // TopicPartition tp2 = new TopicPartition("t4", 0); // TopicPartition tp3 = new TopicPartition("t4", 1); // TopicPartition tp4 = new TopicPartition("t4", 2); // list.add(tp); // list.add(tp2); // list.add(tp3); // list.add(tp4); // consumer.assign(list); Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<TopicPartition, OffsetAndMetadata>(); //指定分割槽 TopicPartition tp = new TopicPartition("t4", 0); //指定偏移量 OffsetAndMetadata metadata = new OffsetAndMetadata(3); offset.put(tp,metadata); //修改偏移量 consumer.commitSync(offset); //訂閱主題 //consumer.subscribe(Arrays.asList("t4")); consumer.assign(Arrays.asList(tp)); //指定分割槽 // TopicPartition tp = new TopicPartition("t4", 0); // consumer.assign(Arrays.asList(tp)); // //修改搜尋指標 // consumer.seek(tp,10); while (true) { ConsumerRecords<String, String> records = consumer.poll(5000); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); consumer.commitSync(); } } }