1. 程式人生 > >SparkSQL的反射機制和自定義建立DataFrame

SparkSQL的反射機制和自定義建立DataFrame

反射機制

1.RDD[Person]-----(case:反射機制)------>DataFrameF[ROW]---->DataSet[Person]
  RDD DF DS
    Person ["name","age","address"] {Person:("name","age","address")}
    Person ["name","age","address"] {Person:("name","age","address")}
    Person ["name","age","address"] {Person:("name","age","address")}


    Person ["name","age","address"] {Person:("name","age","address")}
    Person ["name","age","address"] {Person:("name","age","address")}
2.RDD-->DataFrame-->DataSet
  a.RDD-->DataFrame: sparksession.createDataFrame
  b.RDD-->DataSet: sparksession.createDataSet
  c.DF,DS-->RDD: DF.rdd-->RDD[ROW];DS.rdd-->RDD[Person]

  d.DataFrame-->DataSet: sparksession.createDataSet(df.rdd)
  e.DataSet-->Datafrmae: DS.toDF()

自定義建立DataFrame

 

 

  總共分3步:

    1.從原來的RDD建立一個Row格式的RDD

    2.建立與RDD中Rows結構匹配的StructType,通過該StructType建立表示RDD的Schema
    3.通過SparkSession提供的createDataFrame方法建立DataFrame,方法引數為RDD的Schema

  案例:

def main(args: Array[String]): Unit = {
        val sparksession = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
        import sparksession.implicits._
        val rdd = sparksession.sparkContext.textFile("file:///d:/測試資料/users.txt")
        //step1:從原來的RDD建立一個Row格式的RDD
        val rdd_row = rdd.map(x=>x.split(" ")).map(x=>Row(x(0),x(1).toInt,x(2)))
        //step2:建立與RDD中Rows結構匹配的StructType,通過該StructType建立表示RDD的Schema
    //    val schemaString = "name age address"
    //    // Generate the schema based on the string of schema
    //    val fields = schemaString.split(" ")
    //      .map(fieldName => StructField(fieldName, StringType, nullable = true))
        val fields = List(
                StructField("name", StringType, nullable = true),
                StructField("age", IntegerType, nullable = true),
                StructField("address", StringType, nullable = true)
                 )
        val schema = StructType(fields)
        //step3.通過SparkSession提供的createDataFrame方法建立DataFrame,方法引數為RDD的Schema
        val rdd_df = sparksession.createDataFrame(rdd_row,schema)
        rdd_df.show
      }

 

 

 

 

SparkSQL的執行流程

 

 

  1.SQL執行過程

    select f1,f2,f3 from table_name where condition

    Step1-Parse(解析):
      首先,根據SQL語法搜素關鍵字(select、from、where、group by等等),標誌出projection、DataSource、filter
    Step2-Bind(繫結):
      通過解析階段的相關內容(projection、DataSource、filter),校驗DataSource、filed合法性;如果校驗失敗,拋異常。
    Step3-optimize(優化):
      通過資料庫對當前DataSource進行的統計資料分析,執行相應的優化措施。
    Step3-Execute(執行):
      開啟物理執行,將邏輯計劃轉化為相對應的Task。

  2.執行計劃實質:看做成tree(樹),樹節點上通過Rule物件儲存節點資訊。

      SparkSQL tree節點分一個幾類:

        a.一元節點:filter、count等
        b.二元節點:join等
        c.葉子節點:載入外部資料等;