1. 程式人生 > 其它 >Kafka使用詳解-Consumer API(自動提交offset)

Kafka使用詳解-Consumer API(自動提交offset)

技術標籤:kafka消費者kafkajava

自動提交offset

每次按照offset設定的時間提交offset

自動提交offset的相關引數:

kafka自動提交是為了方便我們更加關注業務。

​ **enable.auto.commit:**是否開啟自動提交offset功能。

auto.commit.interval.ms:自動提交offset的時間間

隔。

以下為自動提交的offset程式碼:

offset設定成latest,offset由非同步的方式提交offset,每次從最新的方式offset消費

package com.ln.kafka.custom;

import
org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Arrays; import java.util.Collection; import java.util.Properties; /** * @ProjectName: kafka * @Package: com.ln.kafka.custom * @Name:AutoSubmitOffset * @Author:linianest * @CreateTime:2021/1/8 9:31 * @version:1.0 * @Description TODO: 消費者自動提交offset,預設是從最新的offset消費 */
public class AutoSubmitOffset { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName
()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "1234"); // todo 自動提交offset,預設值為true,設定成false,就得手動提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // todo 預設自動提交offset的時間是5秒,如果想加快,需要修改引數 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 建立物件 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 訂閱的消費者主題 consumer.subscribe(Arrays.asList("first")); while (true) { // 拉取資料 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("topic=%d,key=%s,value=%s%n", record.topic(), record.offset(), record.value()); } } } }