1. 程式人生 > >Spark-SQL學習筆記_總結和拓展

Spark-SQL學習筆記_總結和拓展

一、Spark-SQL應用場景

1.資料檔案即席查詢 Ad-hoc

    普通查詢:定製化查詢

2.對流資料檔案採用SQL分析   Spark-Streaming+Spark-SQL

3.使用SQL完成ETL開發

  1> 格式轉換  ==>列式儲存 parquet/json==>parquet

  2> 日誌資料清洗,把日誌一部分內容抽取出來做分析統計

        2.1> 新增欄位 ip=>城市

   2.2> 日期欄位 轉換成分割槽欄位

   2.3> 剔除不需要欄位

4.與外部資料來源做互動查詢操作 外部資料來源API!!!

  傳統:通過sqoop把資料抽取到資料平臺上去

           然後註冊成Hive表,進行統計分析

  進階:Spark-SQL

           把外部資料來源中的表註冊成DataFrame

           通過DataFrame API進行統計分析

5.叢集查詢效能拓展

     spark on yarn 開發

     spark local模式 測試

  二、載入資料

  1.直接載入到dataframe/dataset

  2.載入到RDD進行轉換

  3.從本地或雲端(HDFS/S3)載入資料

  1>將資料載入成RDD

  val masterLog = sc.textFile("file://")

   val workerLog = sc.textFile("file://

")

   val allLog=sc.textFile("file://*out*")

   masterLog.count    

   masterLog.collect

   masterLog.collect.foreach(println)

   workerLog.count

   allLog.count

   問題:使用SQL進行查詢?

   2>RDD==>DataFrame(DataFrame程式設計)

   import org.apache.spark.sql.Row

   val masterRDD = masterLog.map(x => Row(x))

   import org.apache.spark.sql.types._

   val schemaString = "line"

   val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))

   val schema = StructType(fields)

   val masterDF = spark.createDataFrame(masterRDD, schema)

   masterDF.show(false)

   masterDF.printSchema

   3>RDD==>DataFrame(sql方式)

   masterDF.createOrReplaceTempView(master_logs)

   spark.sql("select * from master_logs limit 10").show(false)

   三、外部資料來源 (json/parquet)

   DataFrame程式設計

   val usersDF=spark.read.format("parquet").load("file://users.parquet")

   DataFrame sql

   spark.sql("").show(false)

   spark.sql("select * from parquet.``")

   Drill 大資料處理框架

   四、從雲端讀取資料(HDFS/s3a/s3n)

   val hdfsRDD=sc.textFile("hdfs://path/file")

   val s3RDD=sc.textFile("s3a://bucket/object")

   spark.read.format("text").load("")

   五、DataFrame vs SQL

   1> DataFrame=RDD+schema

   2> dataset in Row =DataFrame

   3> DataFrame over rdd=Catalyst優化+schema

   4> DataFrame text json parquet ...

   5> DataFrame=SQLAPI +DataFrame API (Catalyst優化)

   六、schema

   inferred 隱式 顯示

   資料倉庫開發:

   1> spark操作hive

   2> 列式儲存

   3>自動推導無需指定schema資訊

   七、Save Results

   SavaMode(Spark官網)

   Spark預設snappy壓縮格式-parquet

   1> ErrorIfExists

   2> Append

   3> Ignore

   4> Overwrite

   val df=spark.read.format("json").

   load("file://people.json")

   df.show

   df.select("name").write.format("parquet")

   .mode("overwrite").save("file:///home/hadoop/data/overwrite")

  spark.read.format("parquet").load("file:///home/hadoop/data/overwrite")

   八、處理json資料

     支援schema資訊自動推導

   val  json

=spark.read.format("json").load("file://test.json")

   json.show

   json.createOrReplaceTempView("json_table")

   spark.sql("select * from json_table").show

   spark.sql("select name,nums[1] from json_table").show

   spark.sql("select name,explode(nums)from json_table").show

   spark.sql("select name,address.city,address.state from json_table2").show

   九、Spark-SQL中SQL覆蓋度

   1> 1.6(支援SQL一半)

                不支援TPC-DS

   2> 2.0(支援SQL2003,支援TPC—DS)

               支援子查詢

               支援向量化 一次讀1024行

               外部資料來源

                rdbms JDBC jars

                parquet Phoenix csv avro

  十、補充

           JDBC資料來源API原始碼解析

          1>  abstract class BaseRelation  

               定義schema資訊

          2> trait RelationProvider 

               建立BaseRelation

            Scan    Scan==>RDD-Row

            val df =spark.read.format("").option("path","file:///").load()