1. 程式人生 > >kafka_2.11-1.1.0生產者消費者

kafka_2.11-1.1.0生產者消費者

1.idea新建maven專案,匯入配置檔案
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>7</source>
                    <target>7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>1.1.0</version>

        <exclusions>
            <exclusion>
                <artifactId>jmxtools</artifactId>
                <groupId>com.sun.jdmk</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jmxri</artifactId>
                <groupId>com.sun.jmx</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jms</artifactId>
                <groupId>javax.jms</groupId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
</dependencies>


    <groupId>cn.itcast</groupId>
    <artifactId>KafakaApi</artifactId>
    <version>1.0-SNAPSHOT</version>


</project>

2.生產者:

package kafaka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class SimpleKafkaProducer {
    public static  void main(String[] args)
    {
        Properties props=new Properties();
        //broker地址
        props.put("bootstrap.servers", "mini1:9092,mini2:9092,mini3:9092");
        //請求的時候需要驗證
        props.put("acks", "all");
        //請求失敗需要重試jps

        props.put("retries", "0");
        //記憶體快取區大小
        props.put("buffer.memory", 33554432);
        //指定訊息key序列化方式
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        //指定訊息本身的序列化方式
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }
}

3.消費者

package kafaka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class SimpleKafkaConsumer {
    public static void main(String[] args) {

        Properties props=new Properties();

        props.put("bootstrap.servers", "mini1:9092,mini2:9092,mini3:9092");
        //每個消費者分配獨立的組號
        props.put("group.id", "test");

        //如果value合法,則自動提交偏移量
        props.put("enable.auto.commit", "true");

        //設定多久一次更新被消費訊息的偏移量
        props.put("auto.commit.interval.ms", "1000");

        //設定會話響應的時間,超過這個時間kafka可以選擇放棄消費或者消費下一條訊息
        props.put("session.timeout.ms", "30000");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer <String, String> consumer = new KafkaConsumer<>(props);


        consumer.subscribe(Collections.singletonList("test"));

        System.out.println("Subscribed to topic " + "test");
        int i = 0;

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)

                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n",
                        record.offset(), record.key(), record.value());
        }
    }
}