Spark Streaming 整合 SparkSQL處理流式計算
阿新 • • 發佈:2018-11-28
Spark Streaming之所以成為現在主流的流處理開發計算框架,不僅僅是因為它具有流處理和批處理的能力及支援離線和實時計算雙重特點,更重要的是Spark具有良好的生態,它不僅可以整合Hadoop生態的Hive,使用Hive on Spark進行離線分析,整合Yarn模式,使用Spark on Yarn進行資源排程,更有自身的Spark SQL及GraphX和machine learning進行更高層次的研究分析。
今天給大家分享下Spark Streaming整合SparkSQL處理流式計算
程式碼如下:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamingAndSparkSQL { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SocketWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) val lines = ssc.socketTextStream("node1",6666) val words = lines.flatMap(_.split(" ")) words.foreachRDD(rdd=>{ val spark = SparkSessionSingleton.getInstance(sparkConf) import spark.implicits._ val RDDToDF = rdd.map(t=>Record(t)).toDF RDDToDF.createOrReplaceTempView("wordtest") val wordcountDF = spark.sql("select word,count(1) as total from wordtest group by word") wordcountDF.show() }) ssc.start() ssc.awaitTermination() } } case class Record(word:String) object SparkSessionSingleton { private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } }
在啟動程式之前,先開啟socket輸入埠
下面是列印在控制檯的計數輸出
通過以上的輸出可以看到,這個程式是分批次計數的,而不是將所有的單詞計數,那麼我們怎樣實現所有批次單詞的計數呢? 今天先分享到這裡,回頭再補充