Spark-SQL學習筆記_總結和拓展
一、Spark-SQL應用場景
1.資料檔案即席查詢 Ad-hoc
普通查詢:定製化查詢
2.對流資料檔案採用SQL分析 Spark-Streaming+Spark-SQL
3.使用SQL完成ETL開發
1> 格式轉換 ==>列式儲存 parquet/json==>parquet
2> 日誌資料清洗,把日誌一部分內容抽取出來做分析統計
2.1> 新增欄位 ip=>城市
2.2> 日期欄位 轉換成分割槽欄位
2.3> 剔除不需要欄位
4.與外部資料來源做互動查詢操作 外部資料來源API!!!
傳統:通過sqoop把資料抽取到資料平臺上去
然後註冊成Hive表,進行統計分析
進階:Spark-SQL
把外部資料來源中的表註冊成DataFrame
通過DataFrame API進行統計分析
5.叢集查詢效能拓展
spark on yarn 開發
spark local模式 測試
二、載入資料
1.直接載入到dataframe/dataset
2.載入到RDD進行轉換
3.從本地或雲端(HDFS/S3)載入資料
1>將資料載入成RDD
val masterLog = sc.textFile("file://")
val workerLog = sc.textFile("file://
val allLog=sc.textFile("file://*out*")
masterLog.count
masterLog.collect
masterLog.collect.foreach(println)
workerLog.count
allLog.count
問題:使用SQL進行查詢?
2>RDD==>DataFrame(DataFrame程式設計)
import org.apache.spark.sql.Row
val masterRDD = masterLog.map(x => Row(x))
import org.apache.spark.sql.types._
val schemaString = "line"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val masterDF = spark.createDataFrame(masterRDD, schema)
masterDF.show(false)
masterDF.printSchema
3>RDD==>DataFrame(sql方式)
masterDF.createOrReplaceTempView(master_logs)
spark.sql("select * from master_logs limit 10").show(false)
三、外部資料來源 (json/parquet)
DataFrame程式設計
val usersDF=spark.read.format("parquet").load("file://users.parquet")
DataFrame sql
spark.sql("").show(false)
spark.sql("select * from parquet.``")
Drill 大資料處理框架
四、從雲端讀取資料(HDFS/s3a/s3n)
val hdfsRDD=sc.textFile("hdfs://path/file")
val s3RDD=sc.textFile("s3a://bucket/object")
spark.read.format("text").load("")
五、DataFrame vs SQL
1> DataFrame=RDD+schema
2> dataset in Row =DataFrame
3> DataFrame over rdd=Catalyst優化+schema
4> DataFrame text json parquet ...
5> DataFrame=SQLAPI +DataFrame API (Catalyst優化)
六、schema
inferred 隱式 顯示
資料倉庫開發:
1> spark操作hive
2> 列式儲存
3>自動推導無需指定schema資訊
七、Save Results
SavaMode(Spark官網)
Spark預設snappy壓縮格式-parquet
1> ErrorIfExists
2> Append
3> Ignore
4> Overwrite
val df=spark.read.format("json").
load("file://people.json")
df.show
df.select("name").write.format("parquet")
.mode("overwrite").save("file:///home/hadoop/data/overwrite")
spark.read.format("parquet").load("file:///home/hadoop/data/overwrite")
八、處理json資料
支援schema資訊自動推導
val json
=spark.read.format("json").load("file://test.json")
json.show
json.createOrReplaceTempView("json_table")
spark.sql("select * from json_table").show
spark.sql("select name,nums[1] from json_table").show
spark.sql("select name,explode(nums)from json_table").show
spark.sql("select name,address.city,address.state from json_table2").show
九、Spark-SQL中SQL覆蓋度
1> 1.6(支援SQL一半)
不支援TPC-DS
2> 2.0(支援SQL2003,支援TPC—DS)
支援子查詢
支援向量化 一次讀1024行
外部資料來源
rdbms JDBC jars
parquet Phoenix csv avro
十、補充
JDBC資料來源API原始碼解析
1> abstract class BaseRelation
定義schema資訊
2> trait RelationProvider
建立BaseRelation
Scan Scan==>RDD-Row
val df =spark.read.format("").option("path","file:///").load()