1. 程式人生 > >spark streaming 通過zookeeper讀取kafka上的資料

spark streaming 通過zookeeper讀取kafka上的資料

maven 依賴如下

 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId
>
<artifactId>spark-streaming_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId
>
<version>2.0.2</version> </dependency>

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache
.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; @SuppressWarnings("serial") public class JavaKafkaWordCount { private JavaKafkaWordCount() { } public static void main(String[] args) throws Exception { args = new String[4]; args[0] = "192.168.80.4:2181,192.168.80.5:2181,192.168.80.8:2181"; args[1] ="group1"; args[2] ="testTopic"; args[3]="3"; if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]"); // Create the context with 2 seconds batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); Map<String, Integer> topicMap = new HashMap<>(); String[] topics = args[2].split(","); for (String topic : topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); lines.foreachRDD(new VoidFunction<JavaRDD<String>>() { @Override public void call(JavaRDD<String> t) throws Exception { t.collect().forEach(new Consumer<String>() { @Override public void accept(String t) { System.out.println(t); } }); } }); jssc.start(); jssc.awaitTermination(); } }