1. 程式人生 > 其它 >在Idea中讀取Kafka消費者資料——指定Topic消費

在Idea中讀取Kafka消費者資料——指定Topic消費

技術標籤:kafka大資料javaintellij idea

在Idea中讀取Kafka消費者資料

一、依賴

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </
repositories> <properties> <kafka-version>2.1.0-cdh6.2.0</kafka-version> </properties> <dependencies> <!--kafka依賴--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-
api</artifactId> <version>${kafka-version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${kafka-version}</
version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka-version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka-version}</version> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> </dependencies>

二、程式碼如下

package Demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class kafkaDemo1 {
    public static void main(String[] args) {
        String topicName = "datapartition";

        //1.配置屬性值
        Properties properties = new Properties();
        //kafka是伺服器地址
        properties.put("bootstrap.servers", "hdp1:9092");
        //定義消費者組
        properties.put("group.id", "gmall");
        //自動提交(offset)
        properties.put("enable.auto.commit", "true");
        //自動處理的間隔時間1秒
        properties.put("auto.commit.interval.ms", "1000");
        //key和values的持久化設定
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //2.建立消費者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        //3.消費topic(可以有多個topic)
        kafkaConsumer.assign(Arrays.asList(new TopicPartition(topicName,0)));
        kafkaConsumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName,0)));
        //4.執行消費的操作
        while (true) {
            //100ms消費一次
            //kafkaConsumer.poll(100)讀出來,讀到records
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("-----------------");
                //列印偏移量,key,value
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }
}

三、部分列印結果

在這裡插入圖片描述