Kafka消費者Java API
阿新 • • 發佈:2018-12-11
廢話不多說,直接上程式碼
consumer.java
package cn.ysjh; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); // 定義kakfa 服務的地址,不需要將所有broker指定上 props.put("bootstrap.servers", "172.17.0.3:9092"); // 制定consumer group props.put("group.id", "test"); // 是否自動確認offset props.put("enable.auto.commit", "true"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定義consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消費者訂閱的topic, 可同時訂閱多個 consumer.subscribe(Arrays.asList("test")); while (true) { // 讀取資料,讀取超時時間為100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
然後在叢集開啟一個生產者,進行訊息的生產,然後會在eclipse的控制檯看到消費資料的情況