kafka手動提交,丟失資料
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// props.put("enable.auto.commit", "true");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("max.poll.records", "10");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
logger.info("topic訂閱成功"); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); logger.info("拉取條數:"+ records.count()); try { for (ConsumerRecord<String, String> record : records){ logger.info("offset = "+record.offset()+", key = "+record.key()+", value ="+record.value());
// consumer.commitSync();在這提交的化會有丟失資料的風險,例如只消費了一條,然後就掛機了,機器在上線的話後面的也消費不到
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//正確提交,為拉取的一批資料處理完後,一次性提交
consumer.commitSync();
}
}