Spark SQL與DataSet
Spark SQL的架構圖
Spark SQL是用於結構化資料處理的Spark模組。與基本的Spark RDD API不同,Spark SQL提供的介面為Spark提供了有關資料結構和正在執行的計算的更多資訊。在內部,Spark SQL使用此額外資訊來執行額外的優化
Spark SQL執行計劃生成和優化都由Catalyst完成
DataSet是分散式資料集合。Dataset是Spark 1.6中新增的一個新介面,它提供了RDD的優勢(強型別,使用強大的lambda函式的能力)以及Spark SQL優化執行引擎的優點。資料集可以被構造從JVM物件,然後使用功能性的轉換(操作map,flatMap,filter等等)。
DataFrame
RDD 優點
- JVM物件組成的分散式資料集合
- 不可變並且有容錯能力
- 可處理機構化和非結構化的資料
- 支援函式式轉換
RDD缺點
- 沒有Schema
- 使用者自己優化程式
- 從不同的資料來源讀取資料非常困難
- 合併多個數據源中的資料也非常困難
DataFrame API
- Row物件組成的分散式資料集
- 不可變並且有容錯能力
- 處理結構化資料
- 自帶優化器Catalyset,可自動優化程式
- Data source API
DataFrame讓Spark對結構化資料有了處理能力
DataFrame的缺點:
1.編譯時不能型別轉化安全檢查,執行時才能確定是否有問題
2.對於物件支援不友好,rdd內部資料直接以java物件儲存,dataframe記憶體儲存的是row物件而不能是自定義物件
DateSet的優點:
1.DateSet整合了RDD和DataFrame的優點,支援結構化和非結構化資料
2.和RDD一樣,支援自定義物件儲存
3.和DataFrame一樣,支援結構化資料的sql查詢
4.採用堆外記憶體儲存,gc友好
5.型別轉化安全,程式碼友好
foreach 在Executor端遍歷
cache
persist //持久化
printSchema
toDF
unpersist //清除持久化的
建立SparkSession 命名為spark 下面使用
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
利用反射機制推斷RDD模式
在利用反射機制推斷RDD模式時,需要首先定義一個case class,因為,只有case class才能被Spark隱式地轉換為DataFrame
程式碼片段
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //匯入包,支援把一個RDD隱式轉換為一個DataFrame
//定義一個case class
case class Person(name: String, age: Long)
//轉換成DataFrame
val peopleDF = spark.sparkContext
.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes =>Person(attributes(0),attributes(1).trim.toInt))
.toDF()
//必須註冊為臨時表才能供下面的查詢使用
peopleDF.createOrReplaceTempView("people")
val personsRDD = spark.sql("select name,age from people where age > 20")
//最終生成一個DataFrame,下面是系統執行返回的資訊
//personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show() //DataFrame中的每個元素都是一行記錄,包含name和age兩個欄位,分別用t(0)和t(1)來獲取值
//下面是系統執行返回的資訊
+------------------+
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+
當無法提前定義case class時,就需要採用程式設計方式定義RDD模式。
程式碼片段
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//生成欄位
val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
val schema = StructType(fields)
//從上面資訊可以看出,schema描述了模式資訊,模式中包含name和age兩個欄位
//shcema就是“表頭”
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//對peopleRDD 這個RDD中的每一行元素都進行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0),attributes(1).trim.toInt))
//上面得到的rowRDD就是“表中的記錄”
//下面把“表頭”和“表中的記錄”拼裝起來
val peopleDF = spark.createDataFrame(rowRDD, schema)
//必須註冊為臨時表才能供下面查詢使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|
下面的spark代表SparkSession
讀寫json資料
val peopleDF = spark.read.format("json").load("/people.json")
或者
val peopleDF = spark.read.json("/people.json")
peopleDF.select("name", "age").write.format("json").save("namesAndAges.json")
讀寫parquet資料
val peopleDF = spark.read.format("parquet").load("/people.parquet")
或者
val peopleDF = spark.read.parquet("/people.parquet")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
讀取text資料
spark.read.text(…) // 返 回 Dataset<Row>
或者
spark.read.textFile(…) // 返 回 Dataset<String>
讀取寫jdbc資料 如:mysql
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/spark")
.option("driver","com.mysql.jdbc.Driver").
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
Spark SQL調優
DataFrame快取
dataFrame.cache()
Reduce task數目: spark.sql.shuffle.partitions (預設是200)可以根據自己的情況設定partitions個數
讀資料時每個Partition大小:spark.sql.files.maxPartitionBytes(預設128MB)
小檔案合併讀: spark.sql.files.openCostInBytes (預設是4194304 (4 MB) )
廣播小表大小: spark.sql.autoBroadcastJoinThreshold(預設是10485760 (10 MB))