Kafka的生產者消費者Java操作示例
阿新 • • 發佈:2019-02-08
本文提供Java對Kafka生產者、消費者操作的簡單示例:
1.首先看下pom依賴:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.22</version> </dependency>
2.新建Producer類,其程式碼如下:
public class Producer { public static void main(String[] args){ int events = 100; Properties props = new Properties(); //叢集地址,多個伺服器用","分隔 props.put("bootstrap.servers", "127.0.0.1:9092"); //key、value的序列化,此處以字串為例,使用kafka已有的序列化類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //props.put("partitioner.class", "com.kafka.demo.Partitioner");//分割槽操作,此處未寫 props.put("request.required.acks", "1"); //建立生產者 Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < events; i++){ long runtime = new Date().getTime(); String ip = "192.168.1." + i; String msg = runtime + "時間的模擬ip:" + ip; //寫入名為"test-partition-1"的topic ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test-partition-1", "key-"+i, msg); producer.send(producerRecord); System.out.println("寫入test-partition-1:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } producer.close(); } }
3.新建Consumer類,其程式碼如下:
public class Consumer { public static void main(String[] args) { Properties props = new Properties(); //叢集地址,多個地址用","分隔 props.put("bootstrap.servers","127.0.0.1:9092"); //設定消費者的group id props.put("group.id", "group1"); //如果為真,consumer所消費訊息的offset將會自動的同步到zookeeper。如果消費者死掉時,由新的consumer使用繼續接替 props.put("enable.auto.commit", "true"); //consumer向zookeeper提交offset的頻率 props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //建立消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 訂閱topic,可以為多個用,隔開,此處訂閱了"test-partition-1", "test"這兩個主題 consumer.subscribe(Arrays.asList("test-partition-1", "test")); //持續監聽 while(true){ //poll頻率 ConsumerRecords<String,String> consumerRecords = consumer.poll(100); for(ConsumerRecord<String,String> consumerRecord : consumerRecords){ System.out.println("在test-partition-1中讀到:" + consumerRecord.value()); } } } }
4.測試,不要忘記啟動kafka後再進行測試,最好先啟動消費者再啟動生產者,這樣效果比較好,分別看下生產者和消費者測試結果:
測試成功