DStream、RDD、DataFrame 的相互轉換、spark 比 MapReduce 快的原因
阿新 • • 發佈:2022-03-16
DStream、RDD、DataFrame 的相互轉換、spark 比 MapReduce 快的原因
目錄
- DStream、RDD、DataFrame 的相互轉換
- spark 比 MapReduce 快的原因
DStream、RDD、DataFrame 的相互轉換
DStream → RDD → DataFrame
package com.shujia.stream import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Duration, Durations, StreamingContext} object Demo4DStoRDD { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .master("local[2]") .appName("stream") .config("spark.sql.shuffle.partitions", 1) .getOrCreate() //需要加上隱式轉換 -- RDD → DataFrame import spark.implicits._ val sc: SparkContext = spark.sparkContext val ssc = new StreamingContext(sc, Durations.seconds(5)) val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888) /** * DStream 底層是不斷重複計算的rdd, * 可以將DStream轉換成RDD來使用 * * foreachRDD相當於一個迴圈,每隔5秒執行一次,rdd的資料是當前batch接收到的資料 * */ linesDS.foreachRDD((rdd: RDD[String]) => { /*val kvRDD: RDD[(String, Int)] = rdd.flatMap(_.split(",")).map((_, 1)) val countRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _) countRDD.foreach(println)*/ /** * RDD可以轉換成DF * */ val linesDF: DataFrame = rdd.toDF("line") //註冊一張表 linesDF.createOrReplaceTempView("lines") val countDF: DataFrame = spark.sql( """ |select word,count(1) as c from ( |select explode(split(line,',')) as word from lines |) as a group by word | """.stripMargin) countDF.show() }) ssc.start() ssc.awaitTermination() ssc.stop() } }
RDD → DStream
package com.shujia.stream import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object Demo5RDDtoDS { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .master("local[2]") .appName("stream") .config("spark.sql.shuffle.partitions", 1) .getOrCreate() import spark.implicits._ val sc: SparkContext = spark.sparkContext val ssc = new StreamingContext(sc, Durations.seconds(5)) val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888) /** * transform:傳入一個RDD,返回RDD並將之構建成DStream * * 將 DS 轉換成rdd之後,rdd沒有 有狀態運算元 ,所以不能進行全域性累加 * transform:將 DS 轉換成RDD,使用rdd的 API,處理完之後返回一個新的rdd * */ val tfDS: DStream[(String, Int)] = linesDS.transform((rdd: RDD[String]) => { val countRDD: RDD[(String, Int)] = rdd .flatMap(_.split(",")) .map((_, 1)) .reduceByKey(_ + _) //返回一個rdd,得到一個新的DS countRDD }) tfDS.print() ssc.start() ssc.awaitTermination() ssc.stop() } }
spark 比 MapReduce 快的原因
1、當對同一個rdd多次使用的時候可以將這個rdd快取起來
2、spark -- 粗粒度的資源排程,MapReduce -- 細粒度的資源排程
3、DAG有向無環圖
兩次shuffle的中間結果不需要落地
spark沒有MapReduce穩定,因為spark用記憶體較多