Java測試Kafka生產者和消費者
阿新 • • 發佈:2019-02-06
一、環境準備
請看上篇文章介紹kafka的部署與安裝,安裝成功之後,啟動kafka。
在Gradle中引入kafka-client依賴
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'
二、生產者建立
使用topic為test
import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class SimpleProducer { public static void main(String[] args) throws Exception { String topicName = "test"; // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "192.168.3.45:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println("Message sent successfully"); producer.close(); } }
三、消費者建立
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; public class SimpleConsumer { public static void main(String[] args) throws Exception { //Kafka consumer configuration settings String topicName = "test"; Properties props = new Properties(); props.put("bootstrap.servers", "192.168.3.45:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)); //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
四、測試
首先啟動消費者程式SimpleConsumer,程式會阻塞,接著啟動生成者程式SimpleProducer,會發現消費者視窗接收到訊息
Subscribed to topic test offset = 17, key = 0, value = 0 offset = 18, key = 1, value = 1 offset = 19, key = 2, value = 2 offset = 20, key = 3, value = 3 offset = 21, key = 4, value = 4 offset = 22, key = 5, value = 5 offset = 23, key = 6, value = 6 offset = 24, key = 7, value = 7 offset = 25, key = 8, value = 8 offset = 26, key = 9, value = 9