Spark SQL詳解
轉自:https://mp.weixin.qq.com/s/SGhYBxGd5qCVfeM70DRFTw
發家史
熟悉spark sql的都知道,spark sql是從shark發展而來。Shark為了實現Hive相容,在HQL方面重用了Hive中HQL的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,可以近似認為僅將物理執行計劃從MR作業替換成了Spark作業(輔以記憶體列式儲存等各種和Hive關係不大的優化);
同時還依賴Hive Metastore和Hive SerDe(用於相容現有的各種Hive儲存格式)。
Spark SQL在Hive相容層面僅依賴HQL parser、Hive Metastore和Hive SerDe。也就是說,從HQL被解析成抽象語法樹(AST)起,就全部由Spark SQL接管了。執行計劃生成和優化都由Catalyst負責。藉助Scala的模式匹配等函式式語言特性,利用Catalyst開發執行計劃優化策略比Hive要簡潔得多。
Spark SQL
spark sql提供了多種介面:
1. 純Sql 文字
2. dataset/dataframe api
當然,相應的,也會有各種客戶端:
sql文字,可以用thriftserver/spark-sql
編碼,Dataframe/dataset/sql
Dataframe/Dataset API簡介
Dataframe/Dataset也是分散式資料集,但與RDD不同的是其帶有schema資訊,類似一張表。
可以用下面一張圖詳細對比Dataset/dataframe和rdd的區別:
Dataset是在spark1.6引入的,目的是提供像RDD一樣的強型別、使用強大的lambda函式,同時使用spark sql的優化執行引擎。到spark2.0以後,DataFrame變成型別為Row的Dataset,即為:
type DataFrame = Dataset[Row]
所以,很多移植spark1.6及之前的程式碼到spark2+的都會報錯誤,找不到dataframe類。
基本操作
val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”) df.show() import spark.implicits._ df.printSchema() df.select("name").show() df.select($"name", $"age" + 1).show() df.filter($"age" > 21).show() df.groupBy("age").count().show() spark.stop()
分割槽分桶 排序
分桶排序儲存hive表
df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)
分割槽以parquet輸出到指定目錄
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
分割槽分桶儲存到hive表
df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")
cube rullup pivot
cube
sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()
rull up
sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()
pivot 只能跟在groupby之後
sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()
SQL程式設計
Spark SQL允許使用者提交SQL文字,支援一下三種手段編寫sql文字:
1. spark 程式碼
2. spark-sql的shell
3. thriftserver
支援Spark SQL自身的語法,同時也相容HSQL。
1. 編碼
要先宣告構建SQLContext或者SparkSession,這個是SparkSQL的編碼入口。早起的版本使用的是SQLContext或者HiveContext,spark2以後,建議使用的是SparkSession。
1. SQLContext
new SQLContext(SparkContext)
2. HiveContext
new HiveContext(spark.sparkContext)
3. SparkSession
不使用hive元資料:
val spark = SparkSession.builder()
.config(sparkConf) .getOrCreate()
使用hive元資料
val spark = SparkSession.builder()
.config(sparkConf) .enableHiveSupport().getOrCreate()
使用
val df =spark.read.json("examples/src/main/resources/people.json")
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people").show()
2. spark-sql指令碼
spark-sql 啟動的時候類似於spark-submit 可以設定部署模式資源等,可以使用
bin/spark-sql –help 檢視配置引數。
需要將hive-site.xml放到${SPARK_HOME}/conf/目錄下,然後就可以測試
show tables;
select count(*) from student;
3. thriftserver
thriftserver jdbc/odbc的實現類似於hive1.2.1的hiveserver2,可以使用spark的beeline命令來測試jdbc server。
安裝部署
1). 開啟hive的metastore
bin/hive --service metastore
2). 將配置檔案複製到spark/conf/目錄下
3). thriftserver
sbin/start-thriftserver.sh --masteryarn --deploy-mode client
對於yarn只支援client模式
4). 啟動bin/beeline
5). 連線到thriftserver
!connect jdbc:hive2://localhost:10001
使用者自定義函式
1. UDF
定義一個udf很簡單,例如我們自定義一個求字串長度的udf。
val len = udf{(str:String) => str.length}
spark.udf.register("len",len)
val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.sql("select len(name) from employees").show()
2. UserDefinedAggregateFunction
定義一個UDAF
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
object MyAverageUDAF extends UserDefinedAggregateFunction {
//Data types of input arguments of this aggregate function
definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)
//Data types of values in the aggregation buffer
defbufferSchema:StructType = {
StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)
}
//The data type of the returned value
defdataType:DataType = DoubleType
//Whether this function always returns the same output on the identical input
defdeterministic: Boolean = true
//Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to
// standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides
// the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still
// immutable.
definitialize(buffer:MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//Updates the given aggregation buffer `buffer` with new input data from `input`
defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0)+ input.getLong(0)
buffer(1) = buffer.getLong(1)+ 1
}
}
// Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`
defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={
buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)
}
//Calculates the final result
defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)
}
使用UDAF
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.udf.register("myAverage", MyAverageUDAF)
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
3. Aggregator
定義一個Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverageAggregator extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
使用
spark.udf.register("myAverage2", MyAverageAggregator)
import spark.implicits._
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]
ds.show()
val averageSalary = MyAverageAggregator.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
資料來源
1. 通用的laod/save函式
可支援多種資料格式:json, parquet, jdbc, orc, libsvm, csv, text
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
預設的是parquet,可以通過spark.sql.sources.default,修改預設配置。
2. Parquet 檔案
val parquetFileDF =spark.read.parquet("people.parquet")
peopleDF.write.parquet("people.parquet")
3. ORC 檔案
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.write.mode("append").orc("/opt/outputorc/")
spark.read.orc("/opt/outputorc/*").show(1)
4. JSON
ds.write.mode("overwrite").json("/opt/outputjson/")
spark.read.json("/opt/outputjson/*").show()
5. Hive 表
spark 1.6及以前的版本使用hive表需要hivecontext。
Spark2開始只需要建立sparksession增加enableHiveSupport()即可。
val spark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
spark.sql("select count(*) from student").show()
6. JDBC
寫入mysql
wcdf.repartition(1).write.mode("append").option("user", "root")
.option("password", "[email protected]#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())
從mysql裡讀
val fromMysql = spark.read.option("user", "root")
.option("password", "[email protected]#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())
7. 自定義資料來源
自定義source比較簡單,首先我們要看看source載入的方式
指定的目錄下,定義一個DefaultSource類,在類裡面實現自定義source。就可以實現我們的目標。
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
class DefaultSource extends DataSourceV2 with ReadSupport {
def createReader(options: DataSourceOptions) = new SimpleDataSourceReader()
}
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
class SimpleDataSourceReader extends DataSourceReader {
def readSchema() = StructType(Array(StructField("value", StringType)))
def createDataReaderFactories = {
val factoryList = new java.util.ArrayList[DataReaderFactory[Row]]
factoryList.add(new SimpleDataSourceReaderFactory())
factoryList
}
}
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
class SimpleDataSourceReaderFactory extends
DataReaderFactory[Row] with DataReader[Row] {
def createDataReader = new SimpleDataSourceReaderFactory()
val values = Array("1", "2", "3", "4", "5")
var index = 0
def next = index < values.length
def get = {
val row = Row(values(index))
index = index + 1
row
}
def close() = Unit
}
使用
val simpleDf = spark.read
.format("bigdata.spark.SparkSQL.DataSources")
.load()
simpleDf.show()
優化器及執行計劃
1. 流程簡介
整體流程如下:
總體執行流程如下:從提供的輸入API(SQL,Dataset, dataframe)開始,依次經過unresolved邏輯計劃,解析的邏輯計劃,優化的邏輯計劃,物理計劃,然後根據cost based優化,選取一條物理計劃進行執行.
簡單化成四個部分:
1). analysis
Spark 2.0 以後語法樹生成使用的是antlr4,之前是scalaparse。
2). logical optimization
常量合併,謂詞下推,列裁剪,boolean表示式簡化,和其它的規則
3). physical planning
eg:SortExec
4). Codegen
codegen技術是用scala的字串插值特性生成原始碼,然後使用Janino,編譯成java位元組碼。Eg: SortExec
2. 自定義優化器
1). 實現
繼承Rule[LogicalPlan]
2). 註冊
spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)
3). 使用
selectExpr("amountPaid* 1")
3. 自定義執行計劃
主要是實現過載count函式的功能
1). 物理計劃:
繼承SparkLan實現doExecute方法
2). 邏輯計劃
繼承SparkStrategy實現apply
3). 註冊到Spark執行策略:
spark.experimental.extraStrategies =Seq(countStrategy)
4). 使用
spark.sql("select count(*) fromtest")