1. 程式人生 > 其它 >初識kafka 之 消費者API

初識kafka 之 消費者API

需求

建立一個消費者,消費Mytopic主題指定分割槽3中的資料。

實現程式碼

package com.lzh.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; // kafka消費者 API public class CustomConsumer { public static void main(String[] args) { // 0 配置 Properties properties = new Properties();
// 連線到伺服器 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.
class.getName()); // 新增groupid,必須 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 1 建立一個消費者物件 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 2 訂閱主題 // ArrayList<String> topics = new ArrayList<String>(); // topics.add("Mytopic"); // kafkaConsumer.subscribe(topics); // 註冊要消費的主題(可以消費多個主題) // 訂閱指定分割槽 3 ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("Mytopic",3)); kafkaConsumer.assign(topicPartitions); // 消費某個主題的某個分割槽資料 // 3 消費資料 // 一直獲取消費資料 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }

結果