Visualizing the stock market structure of sklearn
SparkSQL
發展過程
解決的問題
-
Spark SQL
使用Hive
解析SQL
生成AST
語法樹, 將其後的邏輯計劃生成, 優化, 物理計劃都自己完成, 而不依賴Hive
-
執行計劃和優化交給優化器
Catalyst
-
內建了一套簡單的
SQL
解析器, 可以不使用HQL
, 此外, 還引入和DataFrame
這樣的DSL API
, 完全可以不依賴任何Hive
的元件 -
Shark
只能查詢檔案,Spark SQL
可以直接降查詢作用於RDD
, 這一點是一個大進步
適用場景
定義 | 特點 | 舉例 | |
---|---|---|---|
結構化資料 |
有固定的 |
有預定義的 |
關係型資料庫的表 |
半結構化資料 |
沒有固定的 |
沒有固定的 |
指一些有結構的檔案格式, 例如 |
非結構化資料 |
沒有固定 |
沒有固定 |
指文件圖片之類的格式 |
-
Spark
的RDD
主要用於處理 非結構化資料 和 半結構化資料 -
SparkSQL
主要用於處理 結構化資料
SparkSession
SparkContext在讀取檔案的時候,讀取出來的是 RDD, 不包含 Schema(結構化)資訊。所以出現了SparkSession作為SparkSQL的入口點,包括了 SQLContext
HiveContext
, SparkContext
等元件的功能
val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getOrCreate()
Catalyst優化器
SparkSQL
大部分情況用於處理結構化資料和半結構化資料, 所以 SparkSQL
可以獲知資料的 Schema
, 從而根據其 Schema
來進行優化
SparkSQL整體架構
-
API
層簡單的說就是Spark
會通過一些API
接受SQL
語句 -
收到
SQL
Catalyst
,Catalyst
負責解析SQL
, 生成執行計劃等 -
Catalyst
的輸出應該是RDD
的執行計劃 -
最終交由叢集執行
簡單優化過程
Catalyst
的主要運作原理是分為三步, 先對 SQL
或者 Dataset
的程式碼解析, 生成邏輯計劃, 後對邏輯計劃進行優化, 再生成物理計劃, 最後生成程式碼到叢集中以 RDD
的形式執行
Dataset
-
Dataset
是一個新的Spark
元件, 其底層還是RDD
-
Dataset
提供了訪問物件中某個特定欄位的能力, 不用像RDD
一樣每次都要針對整個物件做操作 -
Dataset
和RDD
不同, 如果想把Dataset[T]
轉為RDD[T]
, 則需要對Dataset
底層的InternalRow
做轉換, 是一個比價重量級的操作
DataFrame
-
DataFrame
是一個類似於關係型資料庫表的函式式元件 -
DataFrame
一般處理結構化資料和半結構化資料 -
DataFrame
具有資料物件的 Schema 資訊 -
可以使用命令式的
API
操作DataFrame
, 同時也可以使用SQL
操作DataFrame
-
DataFrame
可以由一個已經存在的集合直接建立, 也可以讀取外部的資料來源來建立
Dataset 和 DataFrame 的異同
DataFrame
是Dataset
的一種特殊情況, 也就是說DataFrame
是Dataset[Row]
的別名。DataFrame
和Dataset
所表達的語義不同
DataFrame
表達的含義是一個支援函式式操作的表
, 而Dataset
表達是是一個類似RDD
的東西,Dataset
可以處理任何物件DataFrame
中所存放的是Row
物件, 而Dataset
中可以存放任何型別的物件DataFrame
的操作方式和Dataset
是一樣的, 但是對於強型別操作而言, 它們處理的型別不同DataFrame
只能做到執行時型別檢查,Dataset
能做到編譯和執行時都有型別檢查
程式碼解析:
def dataframe4(): Unit = { val spark = SparkSession.builder() .appName("dataframe1") .master("local[6]") .getOrCreate() import spark.implicits._ val personList = Seq(Person("zhangsan", 15), Person("lisi", 20)) // DataFrame 是弱型別的 val df: DataFrame = personList.toDF() df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema)) .show() // DataFrame 所代表的弱型別操作是編譯時不安全 // df.groupBy("name, school") // Dataset 是強型別的 val ds: Dataset[Person] = personList.toDS() ds.map( (person: Person) => Person(person.name, person.age * 2) ) .show() // Dataset 所代表的操作, 是型別安全的, 編譯時安全的 // ds.filter( person => person.school ) }