1. 程式人生 > 其它 >DStream、RDD、DataFrame 的相互轉換、spark 比 MapReduce 快的原因

DStream、RDD、DataFrame 的相互轉換、spark 比 MapReduce 快的原因

DStream、RDD、DataFrame 的相互轉換、spark 比 MapReduce 快的原因

目錄

DStream、RDD、DataFrame 的相互轉換

DStream → RDD → DataFrame

package com.shujia.stream

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}

object Demo4DStoRDD {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[2]")
      .appName("stream")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    //需要加上隱式轉換 -- RDD → DataFrame 
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
      * DStream 底層是不斷重複計算的rdd,
      * 可以將DStream轉換成RDD來使用
      *
      * foreachRDD相當於一個迴圈,每隔5秒執行一次,rdd的資料是當前batch接收到的資料
      *
      */

    linesDS.foreachRDD((rdd: RDD[String]) => {

      /*val kvRDD: RDD[(String, Int)] = rdd.flatMap(_.split(",")).map((_, 1))
      val countRDD: RDD[(String, Int)] = kvRDD.reduceByKey(_ + _)
      countRDD.foreach(println)*/

      /**
        * RDD可以轉換成DF
        *
        */

      val linesDF: DataFrame = rdd.toDF("line")

      //註冊一張表
      linesDF.createOrReplaceTempView("lines")

      val countDF: DataFrame = spark.sql(
        """
          |select word,count(1) as c from (
          |select explode(split(line,',')) as word from lines
          |) as a group by word
          |
        """.stripMargin)

      countDF.show()

    })

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

RDD → DStream

package com.shujia.stream

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Demo5RDDtoDS {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[2]")
      .appName("stream")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
      * transform:傳入一個RDD,返回RDD並將之構建成DStream
      *
      * 將 DS 轉換成rdd之後,rdd沒有 有狀態運算元 ,所以不能進行全域性累加
      * transform:將 DS 轉換成RDD,使用rdd的 API,處理完之後返回一個新的rdd
      *
      */

    val tfDS: DStream[(String, Int)] = linesDS.transform((rdd: RDD[String]) => {


      val countRDD: RDD[(String, Int)] = rdd
        .flatMap(_.split(","))
        .map((_, 1))
        .reduceByKey(_ + _)

      //返回一個rdd,得到一個新的DS
      countRDD
    })

    tfDS.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

spark 比 MapReduce 快的原因

1、當對同一個rdd多次使用的時候可以將這個rdd快取起來

2、spark -- 粗粒度的資源排程,MapReduce -- 細粒度的資源排程

3、DAG有向無環圖

兩次shuffle的中間結果不需要落地

spark沒有MapReduce穩定,因為spark用記憶體較多