Kafka Java API使用Demo
阿新 • • 發佈:2019-02-01
首先匯入相關pom檔案依賴,這裡使用的kafak0.8.1,scala是2.10.4版本,注意匯入正確的版本,與你的kafka叢集版本相匹配。
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion >
<groupId>cn.just.shinelon</groupId>
<artifactId>SparkSql_Proj</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<repositories>
<repository>
<id>scala-tools.org</id>
<name >Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name >
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<!-- kafak依賴-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.10.4</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>2.10.4</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
將需要使用的配置屬性定一個properties檔案。
KafkaProperties.java:
package cn.just.spark.kafka.producer;
/**
* 配置屬性常量
*/
public class KafkaProperties {
public static final String ZK="hadoop-senior.shinelon.com:2181"; //Zookeeper地址
public static final String TOPIC="topic01"; //topic名稱
public static final String BROKER_LIST="hadoop-senior.shinelon.com:9092"; //Broker列表
public static final String GROUP_ID="test_group01"; //消費者使用
}
KafkaProducer.java:
package cn.just.spark.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import scala.collection.Seq;
import java.util.Properties;
/**
* Kafka java API:Producer
*/
public class KafkaProducer extends Thread{
public String topic;
public Producer<Integer,String> producer;
public KafkaProducer(String topic){
this.topic=topic;
Properties properties=new Properties();
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
properties.put("serializer.class","kafka.serializer.StringEncoder");
//設定生產者與消費者的生產握手機制:0代表不需要Broker回覆訊息
//1表示等到Broker回覆訊息之後繼續生產
//-1表示需要所有的Broker都回復訊息之後才繼續,這種更嚴格,資料不會丟失,永續性更好
properties.put("request.required.acks","1");
ProducerConfig config=new ProducerConfig(properties);
producer=new Producer<Integer, String>(config);
}
@Override
public void run() {
int messageId=1;
while(true){
String message="kafkaProducer"+messageId;
producer.send(new KeyedMessage<Integer, String>(topic,message));
System.out.println(message);
messageId++;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
KafkaConsumer.java:
package cn.just.spark.kafka.consumer;
import cn.just.spark.kafka.producer.KafkaProperties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer extends Thread{
public String topic;
public KafkaConsumer(String topic){
this.topic=topic;
}
public ConsumerConnector getConnection(){
Properties properties=new Properties();
properties.put("group.id", KafkaProperties.GROUP_ID);
properties.put("zookeeper.connect",KafkaProperties.ZK);
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
@Override
public void run() {
ConsumerConnector consumer=getConnection();
Map<String, Integer> topicMap=new HashMap<String, Integer>();
topicMap.put(topic,1); //從一個KafkaStream消費資料
//String:topic
//List<KafkaStream<byte[], byte[]>>對應的資料流
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream= consumer.createMessageStreams(topicMap);
//獲取我們每次接收到的資料
KafkaStream<byte[], byte[]> stream=messageStream.get(topic).get(0); //get(0)對應於上面的一個KafkaStream
ConsumerIterator<byte[], byte[]> it=stream.iterator();
while(it.hasNext()){
String message=new String(it.next().message());
System.out.println("resever message: "+message);
}
}
}
生產者和消費者編寫完成後編寫主類進行測試:
KafkaProducerApp.java:
package cn.just.spark.kafka.producer;
import cn.just.spark.kafka.consumer.KafkaConsumer;
public class KafkaProducerApp {
//快捷鍵psvm
public static void main(String[] args) {
new KafkaProducer(KafkaProperties.TOPIC).start();
new KafkaConsumer(KafkaProperties.TOPIC).start();
}
}
測試結果如下,可以看見無論是客戶端消費者還是我們的程式碼編寫程式消費者都能接收到生產者產生的資料。