kafka producer java API的實現
阿新 • • 發佈:2019-01-23
1.開發環境的構建
使用IDEA+MAVEN構建開發環境,可以採用Maven中scala-archetype-simple模板。
2.pom.xml的配置
這裡scala版本為2.11.8,kafka版本為0.9.0.0
3.Kafka配置類KafkaProperties<properties> <scala.version>2.11.8</scala.version> <kafka.version>0.9.0.0</kafka.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> </dependencies>
public static final String ZK = "192.168.254.128:2181";
public static final String TOPIC = "hello_topic";
public static final String BROKER_LIST = "192.168.254.128:9092";
4.生產者類
5.測試類import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; /** * Kafka生產者 */ public class KafkaProducer extends Thread{ private String topic; private Producer<Integer,String> producer; public KafkaProducer(String topic){ this.topic = topic; Properties properties = new Properties(); properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST); properties.put("serializer.class","kafka.serializer.StringEncoder"); properties.put("request.required.acks","1"); producer = new Producer<Integer, String>(new ProducerConfig(properties)); } @Override public void run() { int messageNo = 1; while (true){ String message = "message_"+ messageNo; producer.send(new KeyedMessage<Integer, String>(topic,message)); System.out.println("Send:" + message); messageNo++; try { Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } } } }
public class KafkaClientsApp {
public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
}
}
6.測試結果
測試之前要開啟zookeeper,kafka,和kafka consumer
本地生產的資料被伺服器上的消費者消費