1. 程式人生 > 實用技巧 >spark學習進度17(Catalyst優化器、dataset介紹、dataframe介紹)

spark學習進度17(Catalyst優化器、dataset介紹、dataframe介紹)

RDD 和 SparkSQL 執行時的區別

RDD的執行流程

大致執行步驟

先將RDD解析為由Stage組成的DAG, 後將Stage轉為Task直接執行

問題

任務會按照程式碼所示執行, 依賴開發者的優化, 開發者的會在很大程度上影響執行效率

解決辦法

建立一個元件, 幫助開發者修改和優化程式碼, 但是這在RDD上是無法實現的

SparkSQL提供了什麼?

RDD不同,SparkSQLDatasetSQL並不是直接生成計劃交給叢集執行, 而是經過了一個叫做Catalyst的優化器, 這個優化器能夠自動幫助開發者優化程式碼

也就是說, 在SparkSQL

中, 開發者的程式碼即使不夠優化, 也會被優化為相對較好的形式去執行Catalyst

為了解決過多依賴Hive的問題,SparkSQL使用了一個新的SQL優化器替代Hive中的優化器, 這個優化器就是Catalyst, 整個SparkSQL的架構大致如下

  1. API層簡單的說就是Spark會通過一些API接受SQL語句

  2. 收到SQL語句以後, 將其交給Catalyst,Catalyst負責解析SQL, 生成執行計劃等

  3. Catalyst的輸出應該是RDD的執行計劃

  4. 最終交由叢集執行

總結

SparkSQLRDD不同的主要點是在於其所操作的資料是結構化的, 提供了對資料更強的感知和分析能力, 能夠對程式碼進行更深層的優化, 而這種能力是由一個叫做Catalyst

的優化器所提供的

Catalyst的主要運作原理是分為三步, 先對SQL或者Dataset的程式碼解析, 生成邏輯計劃, 後對邏輯計劃進行優化, 再生成物理計劃, 最後生成程式碼到叢集中以RDD的形式執行

Dataset 的特點

目標

  1. 理解Dataset是什麼

  2. 理解Dataset的特性

Dataset是什麼?

@Test
  def dataset1(): Unit = {
    // 1. 建立 SparkSession
    val spark = new sql.SparkSession.Builder()
      .master("local[6]
") .appName("dataset1") .getOrCreate() // 2. 匯入隱式轉換 import spark.implicits._ // 3. 演示 val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15))) val dataset = sourceRDD.toDS() // Dataset 支援強型別的 API dataset.filter( item => item.age > 10 ).show() // Dataset 支援弱型別 API dataset.filter( 'age > 10 ).show() dataset.filter( $"age" > 10 ).show() // Dataset 可以直接編寫 SQL 表示式 dataset.filter("age > 10").show() }

問題1:People是什麼?

People是一個強型別的類

問題2: 這個Dataset中是結構化的資料嗎?

非常明顯是的, 因為People物件中有結構資訊, 例如欄位名和欄位型別

問題3: 這個Dataset能夠使用類似SQL這樣宣告式結構化查詢語句的形式來查詢嗎?

當然可以, 已經演示過了

問題4:Dataset是什麼?

Dataset是一個強型別, 並且型別安全的資料容器, 並且提供了結構化查詢API和類似RDD一樣的命令式API

Dataset的底層是什麼?

Dataset最底層處理的是物件的序列化形式, 通過檢視Dataset生成的物理執行計劃, 也就是最終所處理的RDD, 就可以判定Dataset底層處理的是什麼形式的資料

val dataset: Dataset[People] = spark.createDataset(Seq(People("zhangsan", 9), People("lisi", 15)))
val internalRDD: RDD[InternalRow] = dataset.queryExecution.toRdd

dataset.queryExecution.toRdd這個API可以看到Dataset底層執行的RDD, 這個RDD中的範型是InternalRow,InternalRow又稱之為Catalyst Row, 是Dataset底層的資料結構, 也就是說, 無論Dataset的範型是什麼, 無論是Dataset[Person]還是其它的, 其最底層進行處理的資料結構都是InternalRow

所以,Dataset的範型物件在執行之前, 需要通過Encoder轉換為InternalRow, 在輸入之前, 需要把InternalRow通過Decoder轉換為範型物件

@Test
  def dataset2(): Unit = {
    // 1. 建立 SparkSession
    val spark = new sql.SparkSession.Builder()
      .master("local[6]")
      .appName("dataset1")
      .getOrCreate()

    // 2. 匯入隱式轉換
    import spark.implicits._

    // 3. 演示
    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
    val dataset = sourceRDD.toDS()

//    dataset.explain(true)
    // 無論Dataset中放置的是什麼型別的物件, 最終執行計劃中的RDD上都是 InternalRow
    val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd
  }

可以獲取Dataset對應的RDD表示

Dataset中, 可以使用一個屬性rdd來得到它的RDD表示, 例如Dataset[T] → RDD[T]

def dataset3(): Unit = {
    // 1. 建立 SparkSession
    val spark = new sql.SparkSession.Builder()
      .master("local[6]")
      .appName("dataset1")
      .getOrCreate()

    // 2. 匯入隱式轉換
    import spark.implicits._

    // 3. 演示
//    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
//    val dataset = sourceRDD.toDS()
    val dataset: Dataset[Person] = spark.createDataset(Seq(Person("zhangsan", 10), Person("lisi", 15)))

    //    dataset.explain(true)
    // 無論Dataset中放置的是什麼型別的物件, 最終執行計劃中的RDD上都是 InternalRow
    // 直接獲取到已經分析和解析過的 Dataset 的執行計劃, 從中拿到 RDD
    val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd

    // 通過將 Dataset 底層的 RDD[InternalRow] 通過 Decoder 轉成了和 Dataset 一樣的型別的 RDD
    val typedRdd: RDD[Person] = dataset.rdd

    println(executionRdd.toDebugString)
    println()
    println()
    println(typedRdd.toDebugString)
  }

使用Dataset.rddDataset轉為RDD的形式
Dataset的執行計劃底層的RDD

可以看到(1)對比(2)對了兩個步驟, 這兩個步驟的本質就是將Dataset底層的InternalRow轉為RDD中的物件形式, 這個操作還是會有點重的, 所以慎重使用rdd屬性來轉換DatasetRDD

總結

  1. Dataset是一個新的Spark元件, 其底層還是RDD

  2. Dataset提供了訪問物件中某個特定欄位的能力, 不用像RDD一樣每次都要針對整個物件做操作

  3. DatasetRDD不同, 如果想把Dataset[T]轉為RDD[T], 則需要對Dataset底層的InternalRow做轉換, 是一個比價重量級的操作

DataFrame 的作用和常見操作

目標

  1. 理解DataFrame是什麼

  2. 理解DataFrame的常見操作

DataFrameSparkSQL中一個表示關係型資料庫中的函式式抽象, 其作用是讓Spark處理大規模結構化資料的時候更加容易. 一般DataFrame可以處理結構化的資料, 或者是半結構化的資料, 因為這兩類資料中都可以獲取到Schema資訊. 也就是說DataFrame中有Schema資訊, 可以像操作表一樣操作DataFrame.

DataFrame由兩部分構成, 一是row的集合, 每個row物件表示一個行, 二是描述DataFrame結構的Schema.

DataFrame支援SQL中常見的操作, 例如:select,filter,join,group,sort,join

@Test
  def dataframe1(): Unit = {
    // 1. 建立 SparkSession
    val spark = SparkSession.builder()
      .appName("dataframe1")
      .master("local[6]")
      .getOrCreate()

    // 2. 建立 DataFrame
    import spark.implicits._

    val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF()

    // 3. 看看 DataFrame 可以玩出什麼花樣
    // select name from ... t where t.age > 10
    dataFrame.where('age > 10)
      .select('name)
      .show()
  }

DataFrame如何建立:

  @Test
  def dataframe2(): Unit = {
    val spark = SparkSession.builder()
      .appName("dataframe1")
      .master("local[6]")
      .getOrCreate()

    import spark.implicits._

    val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))

    // 1. toDF
    val df1 = personList.toDF()
    val df2 = spark.sparkContext.parallelize(personList).toDF()

    // 2. createDataFrame
    val df3 = spark.createDataFrame(personList)

    // 3. read
    val df4 = spark.read.csv("dataset/BeijingPM20100101_20151231_noheader.csv")
    df4.show()
  }

DataFrame介紹:

 @Test
  def dataframe3(): Unit = {
    // 1. 建立 SparkSession
    val spark = SparkSession.builder()
      .master("local[6]")
      .appName("pm analysis")
      .getOrCreate()

    import spark.implicits._

    // 2. 讀取資料集
    val sourceDF: DataFrame = spark.read
      .option("header", value = true)//把第一行弄成header
      .csv("dataset/BeijingPM20100101_20151231.csv")

    // 檢視 DataFrame 的 Schema 資訊, 要意識到 DataFrame 中是有結構資訊的, 叫做 Schema
    sourceDF.printSchema()

    // 3. 處理
    //     1. 選擇列
    //     2. 過濾掉 NA 的 PM 記錄
    //     3. 分組 select year, month, count(PM_Dongsi) from ... where PM_Dongsi != NA group by year, month
    //     4. 聚合
    // 4. 得出結論
//    sourceDF.select('year, 'month, 'PM_Dongsi)
//      .where('PM_Dongsi =!= "NA")
//      .groupBy('year, 'month)
//      .count()
//      .show()

    // 是否能直接使用 SQL 語句進行查詢
    // 1. 將 DataFrame 註冊為臨表
    sourceDF.createOrReplaceTempView("pm")

    // 2. 執行查詢
    val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year, month")

    resultDF.show()

    spark.stop()
  }

Dataset 和 DataFrame 的異同

目標

  1. 理解DatasetDataFrame之間的關係

DataFrame就是Dataset

根據前面的內容, 可以得到如下資訊

  1. Dataset中可以使用列來訪問資料,DataFrame也可以

  2. Dataset的執行是優化的,DataFrame也是

  3. Dataset具有命令式API, 同時也可以使用SQL來訪問,DataFrame也可以使用這兩種不同的方式訪問

所以這件事就比較蹊蹺了, 兩個這麼相近的東西為什麼會同時出現在SparkSQL中呢?

確實, 這兩個元件是同一個東西,DataFrameDataset的一種特殊情況, 也就是說DataFrameDataset[Row]的別名

DataFrameDataset所表達的語義不同

第一點:DataFrame表達的含義是一個支援函式式操作的, 而Dataset表達是是一個類似RDD的東西,Dataset可以處理任何物件

第二點:DataFrame中所存放的是Row物件, 而Dataset中可以存放任何型別的物件

val spark: SparkSession = new sql.SparkSession.Builder()
  .appName("hello")
  .master("local[6]")
  .getOrCreate()

import spark.implicits._

val df: DataFrame = Seq(People("zhangsan", 15), People("lisi", 15)).toDF()       

val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS() 
DataFrame 就是 Dataset[Row]
Dataset 的範型可以是任意型別

第三點:DataFrame的操作方式和Dataset是一樣的, 但是對於強型別操作而言, 它們處理的型別不同

DataFrame在進行強型別操作時候, 例如map運算元, 其所處理的資料型別永遠是Row

df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 10) )(RowEncoder.apply(df.schema)).show()

但是對於Dataset來講, 其中是什麼型別, 它就處理什麼型別

ds.map( (item: People) => People(item.name, item.age * 10) ).show()

第三點:DataFrame只能做到執行時型別檢查,Dataset能做到編譯和執行時都有型別檢查

  1. DataFrame中存放的資料以Row表示, 一個Row代表一行資料, 這和關係型資料庫類似

  2. DataFrame在進行map等操作的時候,DataFrame不能直接使用Person這樣的Scala物件, 所以無法做到編譯時檢查

  3. Dataset表示的具體的某一類物件, 例如Person, 所以再進行map等操作的時候, 傳入的是具體的某個Scala物件, 如果呼叫錯了方法, 編譯時就會被檢查出來

val ds: Dataset[People] = Seq(People("zhangsan", 15), People("lisi", 15)).toDS()
ds.map(person => person.hello) 
這行程式碼明顯報錯, 無法通過編譯

 @Test
  def dataframe4(): Unit = {
    val spark = SparkSession.builder()
      .appName("dataframe1")
      .master("local[6]")
      .getOrCreate()

    import spark.implicits._

    val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))

    // DataFrame 是弱型別的
    val df: DataFrame = personList.toDF()
    df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema))
      .show()

    // DataFrame 所代表的弱型別操作是編譯時不安全
//    df.groupBy("name, school")

    // Dataset 是強型別的
    val ds: Dataset[Person] = personList.toDS()
    ds.map( (person: Person) => Person(person.name, person.age * 2) )
      .show()

    // Dataset 所代表的操作, 是型別安全的, 編譯時安全的
//    ds.filter( person => person.school )
  }

Row

 @Test
  def row(): Unit = {
    // 1. Row 如何建立, 它是什麼
    // row 物件必須配合 Schema 物件才會有 列名
    val p = Person("zhangsan", 15)
    val row = Row("zhangsan", 15)

    // 2. 如何從 Row 中獲取資料
    row.getString(0)
    row.getInt(1)

    // 3. Row 也是樣例類
    row match {
      case Row(name, age) => println(name, age)
    }

  }