1. 程式人生 > 其它 >Kafka消費者 API(1)

Kafka消費者 API(1)

1.需求:建立一個獨立消費者,消費 first 主題中資料。

2.在IDEA建立包名:com.kafka.consumer

3.新建一個CustomConsumer類

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumer { public static void main(String[] args) { //配置 Properties properties = new Properties(); //連線Kafka叢集
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//GroupId properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //建立消費者物件 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); //topic陣列 ArrayList<String> topics = new ArrayList<>(); topics.add("first"); //訂閱topic kafkaConsumer.subscribe(topics); //處理資料 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }

4.在 IDEA 中執行消費者程式。

5.在 Kafka 叢集控制檯,建立 Kafka 生產者,並輸入資料。 

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

同時IDEA收到訊息

sumerRecord(topic = first, partition = 1, leaderEpoch = 9, offset = 23, CreateTime = 1653112344357, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)