java實現kafka整合spark streaming完成wordCount,updateStateByKey完成實時狀態更新
阿新 • • 發佈:2018-11-30
引入依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.2.0</version> </dependency>
1,receiveSpark,基於zookeeper儲存offset等元資料
import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.regex.Pattern; /** * java實現spark streaming與kafka整合並統計各單詞數 */ public class KafkaReceiverSpark { public static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5)); Map<String, Integer> map = new HashMap<String, Integer>(); //主題和對應的分割槽 map.put("kafka_streaming_topic", 1); //zookeeper地址及埠 String zk = "hadoop000:2181"; //group id String groupId = "test"; //獲取到的RDD資料集 JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(ssc, zk, groupId, map); //每行以空格分割成字元陣列 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() { @Override public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception { return Lists.newArrayList(SPACE.split(stringStringTuple2._2)).iterator(); } }); //字串=》(字串,1) JavaPairDStream<String, Integer> mapToPair = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //統計各單詞數 JavaPairDStream<String, Integer> wordCounts = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); } }
2,directSpark,不通過zookeeper,直接通過kafka
import com.google.common.collect.Lists; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.*; import java.util.regex.Pattern; public class KafkaDirectSpark { public static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws InterruptedException { SparkConf sparkConf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); //主題集合 Set<String> topicSet = new HashSet<String>(); topicSet.add("kafka_streaming_topic"); //kafka引數(不通過zk) HashMap<String, String> kafkaParam = new HashMap<String, String>(); kafkaParam.put("metadata.broker.list", "hadoop000:9092"); JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParam, topicSet); //每行以空格分割成字元陣列 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() { @Override public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception { return Lists.newArrayList(SPACE.split(stringStringTuple2._2)).iterator(); } }); //字串=》(字串,1) JavaPairDStream<String, Integer> mapToPair = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //統計各單詞數 JavaPairDStream<String, Integer> wordCounts = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); } }3,updateStateByKey實時狀態更新,job開始到現在的實時情況彙總
import com.google.common.collect.Lists; import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import java.util.*; import java.util.regex.Pattern; /** * java實現spark streaming整合kafka作為資料來源統計字數,實時更新狀態 */ public class KafkaUpdateByKey { public static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws InterruptedException { SparkConf sparkConf = new SparkConf(); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5)); ssc.checkpoint("/home/hadoop/data/checkpoint"); //主題集合 Set<String> topicSet = new HashSet<String>(); topicSet.add("kafka_streaming_topic"); //kafka引數(不通過zk) HashMap<String, String> kafkaParam = new HashMap<String, String>(); kafkaParam.put("metadata.broker.list", "hadoop000:9092"); JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParam, topicSet); //每行以空格分割成字元陣列 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() { @Override public Iterator<String> call(Tuple2<String, String> stringStringTuple2) throws Exception { return Lists.newArrayList(SPACE.split(stringStringTuple2._2)).iterator(); } }); //字串=》(字串,1) JavaPairDStream<String, Integer> mapToPair = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); //統計各單詞數 JavaPairDStream<String, Integer> wordCounts = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).updateStateByKey(new Function2<List<Integer>, org.apache.spark.api.java.Optional<Integer>, org.apache.spark.api.java.Optional<Integer>>() { @Override public org.apache.spark.api.java.Optional<Integer> call(List<Integer> integers, org.apache.spark.api.java.Optional<Integer> integerOptional) throws Exception { Integer updateValue = 0; if (integerOptional.isPresent()){ updateValue = integerOptional.get(); } for (Integer integer : integers) { updateValue += integer; } return Optional.of(updateValue); } }); wordCounts.print(); ssc.start(); ssc.awaitTermination(); } }
4,測試
打包成jar包,在spark上執行,--packages功能是在網上下載指定的依賴包,若下載了,可以用--jar指定
bin/spark-submit --name KafkaUpdateByKey --master local[2] --class KafkaUpdateByKey --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/hadoop/app/myjars/kafka-spark-updateState1.0-SNAPSHOT.jar