Kafka程式碼實現--from-beginning,讀取歷史未消費的資料
阿新 • • 發佈:2019-01-08
Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設是無法實現的,因為只能訂閱最新發送的資料。
通過消費者命令列可以實現,只要在命令列中加上--from-beginning
即可(具體可見文章 Kafka安裝與配置 ),但是通過Java客戶端程式碼如何實現呢?這就要用到訊息偏移量的重定位方法 seek()
或者直接使用 seekToBeginning()
程式碼示例如下:
public class Consumer {
private static final String server = "192.168.3.22:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", server);
properties.put("group.id" , "kafka.group.user");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties, new StringDeserializer(), new StringDeserializer());
consumer.subscribe(Collections.singletonList("kafka.topic.user"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection);
//讀取歷史資料 --from-beginning
for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()){
// 基於seek方法
//TopicPartition tp = entry.getKey();
//long offset = entry.getValue();
//consumer.seek(tp,offset);
// 基於seekToBeginning方法
consumer.seekToBeginning(collection);
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition:" + record.partition() + ",key:" + record.key() + ",value:" + record.value());
consumer.commitAsync();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}