Spark Streaming整合Spark SQL之wordcount案例
阿新 • • 發佈:2018-11-14
案例原始碼:
package cn.ysjh import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext, Time} object SparkStreamingSql { def main(args: Array[String]): Unit = { val cf: SparkConf = new SparkConf().setAppName("SparkStreamingSql").setMaster("local[2]") val streaming: StreamingContext = new StreamingContext(cf, Seconds(5)) val lines = streaming.socketTextStream("192.168.220.134", 6789) val words = lines.flatMap(_.split(" ")) // 將單詞DStream的RDD轉換為DataFrame並執行SQL查詢 words.foreachRDD { (rdd: RDD[String], time: Time) => // 獲取SparkSession的單例例項 val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ //將RDD [String]轉換為RDD [case class]到DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // 使用DataFrame建立臨時檢視 wordsDataFrame.createOrReplaceTempView("words") // 使用SQL對錶進行單詞計數並列印它 val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() } streaming.start() streaming.awaitTermination() } // 將RDD轉換為DataFrame的案例類 case class Record(word: String) // 例項化SparkSession的單例例項 object SparkSessionSingleton { @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } } }
可以看出將Spark Streaming中接收到的資料建立成表,然後使用Spark SQL來進行一系列的操作,在實際生產中使用的非常多
執行截圖:
這裡仍然使用netcat來產生socket資料進行測試