1. 程式人生 > 其它 >Kafka消費者 API(2)

Kafka消費者 API(2)

1.需求:建立一個獨立消費者,消費 first 主題 0 號分割槽的資料。

2.在上篇隨筆的package包新建CustomConsumerPartition類

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; public class CustomConsumerPartition { public static void main(String[] args) { //配置 Properties properties = new
Properties(); //連線Kafka叢集 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.
class.getName()); //GroupId(必須配置) properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //建立消費者物件 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); //topic分割槽陣列 ArrayList<TopicPartition> topicPartitions = new ArrayList<>(); topicPartitions.add(new TopicPartition("first",0)); //訂閱topic對應的分割槽 kafkaConsumer.assign(topicPartitions); //處理資料 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }

3.在IDEA執行程式碼

4.在非同步傳送隨筆當中執行CustomProducerCallback()類程式碼(紅色部分為修改部分,指定分割槽為0,key為"")

//傳送資料
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", 0,"","Kafka" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主題:" + metadata.topic() + " 分割槽:" + metadata.partition());
                    }
                }
            });
        }

生產者傳送成功

檢視消費者

5.修改分割槽為1,檢視結果

for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", 1,"","Kafka" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("主題:" + metadata.topic() + " 分割槽:" + metadata.partition());
                    }
                }
            });
        }

消費者無訊息收到