Spark之SparkStreaming案例-kafka
阿新 • • 發佈:2019-02-11
sparkStreaming從kafka中拿取資料
完整程式碼
package com.chb.spark.streaming;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaDirectWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("kafka-wordCount").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(3));
//首先建立一個kafka的引數
Map<String, String> kafkaParams = new HashMap<String, String>();
//此處儲存boker.list
kafkaParams.put("metadata.broker.list", "192.168.1.224:9092, 192.168.1.225:9092, 192.168.1.226:9092");
//可以讀取多個topic
Set<String> topics = new HashSet<String>();
topics.add("wordcount");
JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
jssc,
String.class, //key的型別
String.class, //value的型別
StringDecoder.class, //解碼器
StringDecoder.class,
kafkaParams,
topics
);
/**
* 輸入的lines為Tuple2<String, String>的元組,
* 其實key為空, value為line, flatMap目的將line切分成word
* val words = line.flatMap(_._2.split(" "));
*/
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Tuple2<String, String> t)
throws Exception {
return Arrays.asList(t._2.split(" "));
}
});
/**
* 每個單詞對映
* word=>(word, 1)
* val pairs words.map(x=>(x, 1));
*/
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
});
/**
* 統計每個單詞的總數
* val wcs = pairs.reduceByKey(_+_)
* public interface Function2<T1, T2, R> extends Serializable {
* R call(T1 v1, T2 v2) throws Exception;
* }
*/
JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//觸發Action操作, 產生job, 列印每個batch的前10個結果
wcs.print();
jssc.start();
jssc.awaitTermination();
jssc.stop();
jssc.close();
}
}
scala程式碼
package com.spark.stream.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Durations
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
object WordCountKafka {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]") //至少2個執行緒,一個DRecive接受監聽埠資料,一個計算
val sc = new StreamingContext(sparkConf, Durations.seconds(10));
// 首先要建立一份kafka引數map
// 我們這裡是不需要zookeeper節點的啊,所以我們這裡放broker.list
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "node1:9092,node2:9092,node3:9092"
)
// 然後建立一個set,裡面放入你要讀取的Topic,這個就是我們所說的,它給你做的很好,可以並行讀取多個topic
var topics = Set[String]("wordcount20160423");
//kafka返回的資料時key/value形式,後面只要對value進行分割就ok了
val linerdd = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
sc, kafkaParams, topics)
val wordrdd = linerdd.flatMap { _._2.split(" ") }
val resultrdd = wordrdd.map { x => (x, 1) }.reduceByKey { _ + _ }
// resultrdd.map(x => println(x._1+"--"+x._2))
resultrdd.print()
sc.start()
sc.awaitTermination()
sc.stop()
}
}