SparkSQL的反射機制和自定義建立DataFrame
反射機制
1.RDD[Person]-----(case:反射機制)------>DataFrameF[ROW]---->DataSet[Person]
RDD DF DS
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
Person ["name","age","address"] {Person:("name","age","address")}
2.RDD-->DataFrame-->DataSet
a.RDD-->DataFrame: sparksession.createDataFrame
b.RDD-->DataSet: sparksession.createDataSet
c.DF,DS-->RDD: DF.rdd-->RDD[ROW];DS.rdd-->RDD[Person]
d.DataFrame-->DataSet: sparksession.createDataSet(df.rdd)
e.DataSet-->Datafrmae: DS.toDF()
自定義建立DataFrame |
總共分3步:
1.從原來的RDD建立一個Row格式的RDD
2.建立與RDD中Rows結構匹配的StructType,通過該StructType建立表示RDD的Schema
3.通過SparkSession提供的createDataFrame方法建立DataFrame,方法引數為RDD的Schema
案例:
def main(args: Array[String]): Unit = { val sparksession = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate() import sparksession.implicits._ val rdd = sparksession.sparkContext.textFile("file:///d:/測試資料/users.txt") //step1:從原來的RDD建立一個Row格式的RDD val rdd_row = rdd.map(x=>x.split(" ")).map(x=>Row(x(0),x(1).toInt,x(2))) //step2:建立與RDD中Rows結構匹配的StructType,通過該StructType建立表示RDD的Schema // val schemaString = "name age address" // // Generate the schema based on the string of schema // val fields = schemaString.split(" ") // .map(fieldName => StructField(fieldName, StringType, nullable = true)) val fields = List( StructField("name", StringType, nullable = true), StructField("age", IntegerType, nullable = true), StructField("address", StringType, nullable = true) ) val schema = StructType(fields) //step3.通過SparkSession提供的createDataFrame方法建立DataFrame,方法引數為RDD的Schema val rdd_df = sparksession.createDataFrame(rdd_row,schema) rdd_df.show }
SparkSQL的執行流程 |
1.SQL執行過程
select f1,f2,f3 from table_name where condition
Step1-Parse(解析):
首先,根據SQL語法搜素關鍵字(select、from、where、group by等等),標誌出projection、DataSource、filter
Step2-Bind(繫結):
通過解析階段的相關內容(projection、DataSource、filter),校驗DataSource、filed合法性;如果校驗失敗,拋異常。
Step3-optimize(優化):
通過資料庫對當前DataSource進行的統計資料分析,執行相應的優化措施。
Step3-Execute(執行):
開啟物理執行,將邏輯計劃轉化為相對應的Task。
2.執行計劃實質:看做成tree(樹),樹節點上通過Rule物件儲存節點資訊。
SparkSQL tree節點分一個幾類:
a.一元節點:filter、count等
b.二元節點:join等
c.葉子節點:載入外部資料等;