1. 程式人生 > >Kafka的生產者消費者Java操作示例

Kafka的生產者消費者Java操作示例

本文提供Java對Kafka生產者、消費者操作的簡單示例:

1.首先看下pom依賴:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.0.0</version>
</dependency>
<dependency>
   <groupId>log4j</groupId>
   <artifactId>log4j</artifactId>
   <version>1.2.17</version>
</dependency>
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-nop</artifactId>
   <version>1.7.22</version>
</dependency>

2.新建Producer類,其程式碼如下:

public class Producer {

    public static void main(String[] args){

        int events = 100;
        Properties props = new Properties();
        //叢集地址,多個伺服器用","分隔
        props.put("bootstrap.servers", "127.0.0.1:9092");
        //key、value的序列化,此處以字串為例,使用kafka已有的序列化類
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //props.put("partitioner.class", "com.kafka.demo.Partitioner");//分割槽操作,此處未寫
        props.put("request.required.acks", "1");
        //建立生產者
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < events; i++){
            long runtime = new Date().getTime();
            String ip = "192.168.1." + i;
            String msg = runtime + "時間的模擬ip:" + ip;
            //寫入名為"test-partition-1"的topic
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test-partition-1", "key-"+i, msg);
            producer.send(producerRecord);
            System.out.println("寫入test-partition-1:" + msg);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
}

3.新建Consumer類,其程式碼如下:

public class Consumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        //叢集地址,多個地址用","分隔
        props.put("bootstrap.servers","127.0.0.1:9092");
        //設定消費者的group id
        props.put("group.id", "group1");
        //如果為真,consumer所消費訊息的offset將會自動的同步到zookeeper。如果消費者死掉時,由新的consumer使用繼續接替
        props.put("enable.auto.commit", "true");
        //consumer向zookeeper提交offset的頻率
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //建立消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 訂閱topic,可以為多個用,隔開,此處訂閱了"test-partition-1", "test"這兩個主題
        consumer.subscribe(Arrays.asList("test-partition-1", "test"));
        //持續監聽
        while(true){
            //poll頻率
            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
                 for(ConsumerRecord<String,String> consumerRecord : consumerRecords){
                     System.out.println("在test-partition-1中讀到:" + consumerRecord.value());
            }
        }
    }
}

4.測試,不要忘記啟動kafka後再進行測試,最好先啟動消費者再啟動生產者,這樣效果比較好,分別看下生產者和消費者測試結果:

            

測試成功