spark streaming 通過zookeeper讀取kafka上的資料
阿新 • • 發佈:2019-02-13
maven 依賴如下
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId >
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId >
<version>2.0.2</version>
</dependency>
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache .spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
@SuppressWarnings("serial")
public class JavaKafkaWordCount {
private JavaKafkaWordCount() {
}
public static void main(String[] args) throws Exception {
args = new String[4];
args[0] = "192.168.80.4:2181,192.168.80.5:2181,192.168.80.8:2181";
args[1] ="group1";
args[2] ="testTopic";
args[3]="3";
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = args[2].split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1],
topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> t) throws Exception {
t.collect().forEach(new Consumer<String>() {
@Override
public void accept(String t) {
System.out.println(t);
}
});
}
});
jssc.start();
jssc.awaitTermination();
}
}