1. 程式人生 > >RDD與DataFrame之間的轉換

RDD與DataFrame之間的轉換

RDD轉換為DataFrame

方法1:

1. 需要import spark.implicits._
2. case class + toDF建立DataFrame

//use case class Person
  case class Person(name:String,age:Int)
  def rddToDFCase(sparkSession : SparkSession):DataFrame = {
    //匯入隱飾操作,否則RDD無法呼叫toDF方法
    import sparkSession.implicits._
    val peopleRDD = sparkSession.sparkContext
      .textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
      .map( x => x.split(",")).map( x => Person(x(0),x(1).trim().toInt)).toDF()
    peopleRDD
  }

注意:split將字串分類為多個String,這裡age為Int型別,因此需要將分列後的第二個元素toInt,trim()函式用於去除頭尾空格。

這裡,需要知道要轉換的型別,本例中轉換為int型別,如果沒辦法轉換成需要的型別,比如RDD[Array[String]],那麼就要用第二種方法。

方法2:

1. 從原始RDD中生成RDD的Rows
2. step1中的Rows的structure,生成相應的StructType,表示成schema
3. 將schema對應到Rows上,通過SparkSession提供的createDataFrame函式生成DataFrame
Row代表RDD中的一行資料

//StructType and convert RDD to DataFrame
  def rddToDF(sparkSession : SparkSession):DataFrame = {
    //設定schema結構
    val schema = StructType(
      Seq(
        StructField("name",StringType,true)          
        ,StructField("age",IntegerType,true)
      )
    )
    val rowRDD = sparkSession.sparkContext
      .textFile("file:/E:/scala_workspace/z_spark_study/people.txt",2)
      .map( x => x.split(",")).map( x => Row(x(0),x(1).trim().toInt))  
    sparkSession.createDataFrame(rowRDD,schema)
  }

將DataFrame轉換成RDD

利用.rdd