Kafka新版消費者API示例(一)
阿新 • • 發佈:2018-12-14
Kafka的高階消費者(high-level consumer)和低階消費者(low-level consumer,底層用SimpleConsumer實現)是舊版本的consumer中的。
新版本的consumer中沒有這兩個概念。新版本把高階消費者和低階消費者整合到一起了,對應KafkaConsumer類的subscribe和assign方法。
建立消費者:
String kafkas = "192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094"; Properties props = new Properties(); //kafka連線資訊 props.put("bootstrap.servers",kafkas); //消費者組id props.put("group.id", "test_group"); //是否自動提交offset props.put("enable.auto.commit", "true"); //在沒有offset的情況下采取的拉取策略 props.put("auto.offset.reset", "none"); //自動提交時間間隔 props.put("auto.commit.interval.ms", "1000"); //key反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value反序列化 props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
訂閱主題:
一個消費者可以同時訂閱多個主題,可以以集合的形式指定多個主題,也可以以正則表示式形式訂閱特定模式主題。
三種訂閱方式:
public void subscribe(Collection<String> topics) { this.subscribe((Collection)topics, new NoOpConsumerRebalanceListener()); } public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { this.acquire(); try { if(topics.isEmpty()) { this.unsubscribe(); } else { log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); this.subscriptions.subscribe(topics, listener); this.metadata.setTopics(this.subscriptions.groupSubscription()); } } finally { this.release(); } } public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { this.acquire(); try { log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); this.metadata.requestUpdate(); } finally { this.release(); } }
同時訂閱主題時還可以註冊一個回撥監聽器(ConsumerRebalanceListener介面),用於當消費者發生平衡時回撥處理。
// 2、建立KafkaConsumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 3、訂閱資料,給定監聽器 consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() { //消費者平衡操作開始之前、消費者停止拉取訊息之後被呼叫(可以提交偏移量以避免資料重複消費) @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { //提交偏移量 consumer.commitSync(); } //消費者平衡操作之後、消費者開始拉取訊息之前被呼叫(可以在該方法中保證各消費者回滾到正確的偏移量,重置各消費者偏移量) @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { long committedOffset = -1; for(TopicPartition topicPartition : collection){ //獲取該分割槽已消費的偏移量 committedOffset = consumer.committed(topicPartition).offset(); //重置偏移量到上一次提交的偏移量下一個位置開始消費 consumer.seek(topicPartition,committedOffset + 1); } } });
訂閱指定分割槽:
通過subscribe()方法訂閱主題具有消費者自動均衡的功能。在多執行緒情況下,多個消費者程序根據分割槽分配策略自動分配消費者執行緒與分割槽的關係,當一個消費者組的消費者發生增減變化,分割槽分配會自動調整,以實現消費負載均衡及故障自動轉移。指定分割槽assign()方法訂閱主題不具有自動均衡的功能。
consumer.assign(Arrays.asList(new TopicPartition(topic, 0),new TopicPartition(topic, 1)));
偏移量管理:
Kafka提供兩種查詢偏移量的方法:
1.committed():返回OffsetAndMetadata物件,通過該物件獲取指定分割槽已提交的偏移量。
TopicPartition topicPartition = new TopicPartition(topic, 0);
OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
long offset = offsetAndMetadata.offset();
2.position():返回下一次拉取的位置
long position = consumer.position(topicPartition);
Kafka提供重置消費者偏移量的方法:
1.seek():將消費者起始位置重置到指定偏移量位置
//指定offset消費.
consumer.seek(topicPartition,103);
2.seekToBeginning():指定從訊息起始位置開始消費,對應(auto.offset.reset=earlist)
//指定從最先消費
consumer.seekToBeginning(Collections.singleton(topicPartition));
3.seekToEnd():指定從最新訊息對應的位置開始消費,有新訊息才消費。對應(auto.offset.reset=latest)
//指定從最新消費(捨棄)
consumer.seekToEnd(Collections.singleton(topicPartition));
Kafka消費者消費位移兩種策略:
1.自動提交。
try{
// 4、獲取資料
while (true) {
//長輪詢拉取訊息
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d,offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(),record.offset(), record.key(), record.value());
}
Thread.sleep(5000);
}
}catch (Exception e){
e.printStackTrace();
}
2.手動提交。
接下篇Kafka新版消費者API示例(二) https://blog.csdn.net/Simon_09010817/article/details/83750115