spark學習進度17(Catalyst優化器、dataset介紹、dataframe介紹)
RDD 和 SparkSQL 執行時的區別
RDD
的執行流程
-
大致執行步驟
-
先將
RDD
解析為由Stage
組成的DAG
, 後將Stage
轉為Task
直接執行
問題
-
任務會按照程式碼所示執行, 依賴開發者的優化, 開發者的會在很大程度上影響執行效率
解決辦法
-
建立一個元件, 幫助開發者修改和優化程式碼, 但是這在
RDD
上是無法實現的SparkSQL
提供了什麼?和
RDD
不同,SparkSQL
的Dataset
和SQL
並不是直接生成計劃交給叢集執行, 而是經過了一個叫做Catalyst
的優化器, 這個優化器能夠自動幫助開發者優化程式碼也就是說, 在
SparkSQL
為了解決過多依賴
Hive
的問題,SparkSQL
使用了一個新的SQL
優化器替代Hive
中的優化器, 這個優化器就是Catalyst
, 整個SparkSQL
的架構大致如下-
API
層簡單的說就是Spark
會通過一些API
接受SQL
語句 -
收到
SQL
語句以後, 將其交給Catalyst
,Catalyst
負責解析SQL
, 生成執行計劃等 -
Catalyst
的輸出應該是RDD
的執行計劃 -
最終交由叢集執行
總結
SparkSQL
和RDD
不同的主要點是在於其所操作的資料是結構化的, 提供了對資料更強的感知和分析能力, 能夠對程式碼進行更深層的優化, 而這種能力是由一個叫做Catalyst
Catalyst
的主要運作原理是分為三步, 先對SQL
或者Dataset
的程式碼解析, 生成邏輯計劃, 後對邏輯計劃進行優化, 再生成物理計劃, 最後生成程式碼到叢集中以RDD
的形式執行Dataset 的特點
目標
-
理解
Dataset
是什麼 -
理解
Dataset
的特性
Dataset
是什麼?@Test def dataset1(): Unit = { // 1. 建立 SparkSession val spark = new sql.SparkSession.Builder() .master("local[6]
問題1:
People
是什麼?People
是一個強型別的類問題2: 這個
Dataset
中是結構化的資料嗎?非常明顯是的, 因為
People
物件中有結構資訊, 例如欄位名和欄位型別問題3: 這個
Dataset
能夠使用類似SQL
這樣宣告式結構化查詢語句的形式來查詢嗎?當然可以, 已經演示過了
問題4:
Dataset
是什麼?Dataset
是一個強型別, 並且型別安全的資料容器, 並且提供了結構化查詢API
和類似RDD
一樣的命令式API
Dataset
的底層是什麼?Dataset
最底層處理的是物件的序列化形式, 通過檢視Dataset
生成的物理執行計劃, 也就是最終所處理的RDD
, 就可以判定Dataset
底層處理的是什麼形式的資料val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15))) val internalRDD: RDD[InternalRow] = dataset.queryExecution.toRdd
dataset.queryExecution.toRdd
這個API
可以看到Dataset
底層執行的RDD
, 這個RDD
中的範型是InternalRow
,InternalRow
又稱之為Catalyst Row
, 是Dataset
底層的資料結構, 也就是說, 無論Dataset
的範型是什麼, 無論是Dataset[Person]
還是其它的, 其最底層進行處理的資料結構都是InternalRow
所以,
Dataset
的範型物件在執行之前, 需要通過Encoder
轉換為InternalRow
, 在輸入之前, 需要把InternalRow
通過Decoder
轉換為範型物件@Test def dataset2(): Unit = { // 1. 建立 SparkSession val spark = new sql.SparkSession.Builder() .master("local[6]") .appName("dataset1") .getOrCreate() // 2. 匯入隱式轉換 import spark.implicits._ // 3. 演示 val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15))) val dataset = sourceRDD.toDS() // dataset.explain(true) // 無論Dataset中放置的是什麼型別的物件, 最終執行計劃中的RDD上都是 InternalRow val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd }
可以獲取
Dataset
對應的RDD
表示在
Dataset
中, 可以使用一個屬性rdd
來得到它的RDD
表示, 例如Dataset[T] → RDD[T]
def dataset3(): Unit = { // 1. 建立 SparkSession val spark = new sql.SparkSession.Builder() .master("local[6]") .appName("dataset1") .getOrCreate() // 2. 匯入隱式轉換 import spark.implicits._ // 3. 演示 // val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15))) // val dataset = sourceRDD.toDS() val dataset: Dataset[Person] = spark.createDataset(Seq(Person("zhangsan", 10), Person("lisi", 15))) // dataset.explain(true) // 無論Dataset中放置的是什麼型別的物件, 最終執行計劃中的RDD上都是 InternalRow // 直接獲取到已經分析和解析過的 Dataset 的執行計劃, 從中拿到 RDD val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd // 通過將 Dataset 底層的 RDD[InternalRow] 通過 Decoder 轉成了和 Dataset 一樣的型別的 RDD val typedRdd: RDD[Person] = dataset.rdd println(executionRdd.toDebugString) println() println() println(typedRdd.toDebugString) }
使用 Dataset.rdd
將Dataset
轉為RDD
的形式Dataset
的執行計劃底層的RDD
可以看到
(1)
對比(2)
對了兩個步驟, 這兩個步驟的本質就是將Dataset
底層的InternalRow
轉為RDD
中的物件形式, 這個操作還是會有點重的, 所以慎重使用rdd
屬性來轉換Dataset
為RDD
總結
-
Dataset
是一個新的Spark
元件, 其底層還是RDD
-
Dataset
提供了訪問物件中某個特定欄位的能力, 不用像RDD
一樣每次都要針對整個物件做操作 -
Dataset
和RDD
不同, 如果想把Dataset[T]
轉為RDD[T]
, 則需要對Dataset
底層的InternalRow
做轉換, 是一個比價重量級的操作
DataFrame 的作用和常見操作
目標
-
理解
DataFrame
是什麼 -
理解
DataFrame
的常見操作
-
DataFrame
是SparkSQL
中一個表示關係型資料庫中表
的函式式抽象, 其作用是讓Spark
處理大規模結構化資料的時候更加容易. 一般DataFrame
可以處理結構化的資料, 或者是半結構化的資料, 因為這兩類資料中都可以獲取到Schema
資訊. 也就是說DataFrame
中有Schema
資訊, 可以像操作表一樣操作DataFrame
.DataFrame
由兩部分構成, 一是row
的集合, 每個row
物件表示一個行, 二是描述DataFrame
結構的Schema
.DataFrame
支援SQL
中常見的操作, 例如:select
,filter
,join
,group
,sort
,join
等
@Test def dataframe1(): Unit = { // 1. 建立 SparkSession val spark = SparkSession.builder() .appName("dataframe1") .master("local[6]") .getOrCreate() // 2. 建立 DataFrame import spark.implicits._ val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF() // 3. 看看 DataFrame 可以玩出什麼花樣 // select name from ... t where t.age > 10 dataFrame.where('age > 10) .select('name) .show() }
DataFrame如何建立:
@Test def dataframe2(): Unit = { val spark = SparkSession.builder() .appName("dataframe1") .master("local[6]") .getOrCreate() import spark.implicits._ val personList = Seq(Person("zhangsan", 15), Person("lisi", 20)) // 1. toDF val df1 = personList.toDF() val df2 = spark.sparkContext.parallelize(personList).toDF() // 2. createDataFrame val df3 = spark.createDataFrame(personList) // 3. read val df4 = spark.read.csv("dataset/BeijingPM20100101_20151231_noheader.csv") df4.show() }
DataFrame介紹:
@Test def dataframe3(): Unit = { // 1. 建立 SparkSession val spark = SparkSession.builder() .master("local[6]") .appName("pm analysis") .getOrCreate() import spark.implicits._ // 2. 讀取資料集 val sourceDF: DataFrame = spark.read .option("header", value = true)//把第一行弄成header .csv("dataset/BeijingPM20100101_20151231.csv") // 檢視 DataFrame 的 Schema 資訊, 要意識到 DataFrame 中是有結構資訊的, 叫做 Schema sourceDF.printSchema() // 3. 處理 // 1. 選擇列 // 2. 過濾掉 NA 的 PM 記錄 // 3. 分組 select year, month, count(PM_Dongsi) from ... where PM_Dongsi != NA group by year, month // 4. 聚合 // 4. 得出結論 // sourceDF.select('year, 'month, 'PM_Dongsi) // .where('PM_Dongsi =!= "NA") // .groupBy('year, 'month) // .count() // .show() // 是否能直接使用 SQL 語句進行查詢 // 1. 將 DataFrame 註冊為臨表 sourceDF.createOrReplaceTempView("pm") // 2. 執行查詢 val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year, month") resultDF.show() spark.stop() }
Dataset 和 DataFrame 的異同
目標
-
理解
Dataset
和DataFrame
之間的關係
DataFrame
就是Dataset
-
根據前面的內容, 可以得到如下資訊
-
Dataset
中可以使用列來訪問資料,DataFrame
也可以 -
Dataset
的執行是優化的,DataFrame
也是 -
Dataset
具有命令式API
, 同時也可以使用SQL
來訪問,DataFrame
也可以使用這兩種不同的方式訪問
所以這件事就比較蹊蹺了, 兩個這麼相近的東西為什麼會同時出現在
SparkSQL
中呢?確實, 這兩個元件是同一個東西,
DataFrame
是Dataset
的一種特殊情況, 也就是說DataFrame
是Dataset[Row]
的別名 -
DataFrame
和Dataset
所表達的語義不同-
第一點:
DataFrame
表達的含義是一個支援函式式操作的表
, 而Dataset
表達是是一個類似RDD
的東西,Dataset
可以處理任何物件第二點:
DataFrame
中所存放的是Row
物件, 而Dataset
中可以存放任何型別的物件-
val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate() import spark.implicits._ val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF() val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
DataFrame 就是 Dataset[Row] Dataset 的範型可以是任意型別
第三點:
DataFrame
的操作方式和Dataset
是一樣的, 但是對於強型別操作而言, 它們處理的型別不同-
DataFrame
在進行強型別操作時候, 例如map
運算元, 其所處理的資料型別永遠是Row
df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 10) )(RowEncoder.apply(df.schema)).show()
但是對於
Dataset
來講, 其中是什麼型別, 它就處理什麼型別ds.map( (item: People) => People(item.name, item.age * 10) ).show()
第三點:
DataFrame
只能做到執行時型別檢查,Dataset
能做到編譯和執行時都有型別檢查-
-
DataFrame
中存放的資料以Row
表示, 一個Row
代表一行資料, 這和關係型資料庫類似 -
DataFrame
在進行map
等操作的時候,DataFrame
不能直接使用Person
這樣的Scala
物件, 所以無法做到編譯時檢查 -
Dataset
表示的具體的某一類物件, 例如Person
, 所以再進行map
等操作的時候, 傳入的是具體的某個Scala
物件, 如果呼叫錯了方法, 編譯時就會被檢查出來
val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS() ds.map(person => person.hello)
這行程式碼明顯報錯, 無法通過編譯 -
-
@Test def dataframe4(): Unit = { val spark = SparkSession.builder() .appName("dataframe1") .master("local[6]") .getOrCreate() import spark.implicits._ val personList = Seq(Person("zhangsan", 15), Person("lisi", 20)) // DataFrame 是弱型別的 val df: DataFrame = personList.toDF() df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema)) .show() // DataFrame 所代表的弱型別操作是編譯時不安全 // df.groupBy("name, school") // Dataset 是強型別的 val ds: Dataset[Person] = personList.toDS() ds.map( (person: Person) => Person(person.name, person.age * 2) ) .show() // Dataset 所代表的操作, 是型別安全的, 編譯時安全的 // ds.filter( person => person.school ) }
Row
@Test def row(): Unit = { // 1. Row 如何建立, 它是什麼 // row 物件必須配合 Schema 物件才會有 列名 val p = Person("zhangsan", 15) val row = Row("zhangsan", 15) // 2. 如何從 Row 中獲取資料 row.getString(0) row.getInt(1) // 3. Row 也是樣例類 row match { case Row(name, age) => println(name, age) } }
-
-