1. 程式人生 > >kafka producer java API的實現

kafka producer java API的實現

1.開發環境的構建

使用IDEA+MAVEN構建開發環境,可以採用Maven中scala-archetype-simple模板。

2.pom.xml的配置

這裡scala版本為2.11.8,kafka版本為0.9.0.0

<properties>
    <scala.version>2.11.8</scala.version>
    <kafka.version>0.9.0.0</kafka.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
    </dependency>

  </dependencies>
3.Kafka配置類KafkaProperties
public static final String ZK = "192.168.254.128:2181";

public static final String TOPIC = "hello_topic";

public static final String BROKER_LIST = "192.168.254.128:9092";
4.生產者類
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

/**
 * Kafka生產者
 */
public class KafkaProducer extends Thread{
    private String topic;

    private Producer<Integer,String> producer;

    public KafkaProducer(String topic){
        this.topic = topic;

        Properties properties = new Properties();

        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true){
            String message = "message_"+ messageNo;
            producer.send(new KeyedMessage<Integer, String>(topic,message));
            System.out.println("Send:" + message);

            messageNo++;

            try {
                Thread.sleep(2000);
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}
5.測試類
public class KafkaClientsApp {
    public static void main(String[] args) {
        new KafkaProducer(KafkaProperties.TOPIC).start();
    }
}
6.測試結果

測試之前要開啟zookeeper,kafka,和kafka consumer

本地生產的資料被伺服器上的消費者消費