1. 程式人生 > 實用技巧 >消費訊息+不自動提交

消費訊息+不自動提交

依賴

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

程式碼

package com.perfect.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class KafkaComsumerTest { @Test public void cunsumertest(){ Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092"); //關閉自動提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); //latest,earliest props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata1"); KafkaConsumer<String,String> c = new KafkaConsumer<String, String>(props); c.subscribe(Collections.singletonList("test2")); while(true){ ConsumerRecords records = c.poll(100); records.forEach(System.out::println); } // c.close(); } }