Spark整合Kafka實時流計算Java案例
阿新 • • 發佈:2019-01-27
package com.test; import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.*; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.api.java.Optional; import scala.Tuple2; public class Test5 { public static void main(String[] args) throws InterruptedException { // 接收資料的地址和埠 final JavaPairRDD<String, Integer>[] lastRdd = new JavaPairRDD[1]; SparkConf conf = new SparkConf().setMaster("local").setAppName( "streamingTest"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("ERROR"); sc.setCheckpointDir("./checkpoint"); JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(10)); // kafka相關引數,必要!缺了會報錯 Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "192.168.174.200:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "newgroup2"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection<String> topics = Arrays.asList("test"); JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils .createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams)); // 注意這邊的stream裡的引數本身是個ConsumerRecord物件 JavaPairDStream<String, Integer> counts = stream .flatMap( x -> Arrays.asList(x.value().toString().split(" ")) .iterator()) .mapToPair(x -> new Tuple2<String, Integer>(x, 1)) .reduceByKey((x, y) -> x + y); //counts.print(); JavaPairDStream<String, Integer> result = counts .updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { private static final long serialVersionUID = 1L; @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { /** * values:經過分組最後 這個key所對應的value,如:[1,1,1,1,1] * state:這個key在本次之前之前的狀態 */ Integer updateValue = 0; if (state.isPresent()) { updateValue = state.get(); } for (Integer value : values) { updateValue += value; } return Optional.of(updateValue); } }); result.print(); ssc.start(); ssc.awaitTermination(); ssc.close(); } }