別再人云亦云了!!!你真的搞懂了RDD、DF、DS的區別嗎?
阿新 • • 發佈:2021-02-21
幾年前,包括最近,我看了各種書籍、教程、官網。但是真正能夠把RDD、DataFrame、DataSet解釋得清楚一點的、論據多一點少之又少,甚至有的人號稱Spark專家,但在這一塊根本說不清楚。還有國內的一些書籍,小猴真的想問一聲:Are you OK?書名別再叫精通xxx技術了,請改名為 xxx技術從入門到放棄。這樣可以有效避免耽誤別人學習,不好嗎?
大家都在告訴我們結論,但其實,小猴作為一名長期混跡於開源社群、並仍在一線大資料開發的技術人,深諳技術文化之一:
> **To experience** | 去經歷
這是我要提倡的技術文化之一。之前有人把Experience譯為體驗,但在小猴的技術世界裡,Experience更多的是自己去經歷,而不能跟團去旅遊一樣,那樣你只能是一個外包而已,想要做到卓越,就得去經歷。技術,只有去經歷才會有成長。
## 目錄
[TOC]
## RDD、DataFrame、DataSet介紹
我們每天都在基於框架開發,對於我們來說,一套易於使用的API太重要了。對於Spark來說,有三套API。
![image-20210201000858671](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210201000858.png)
分別是:
* RDD
* DataFrame
* DataSet
三套的API,開發人員就要學三套。不過,從Spark 2.2開始,DataFrame和DataSet的API已經統一了。而編寫Spark程式的時候,RDD已經慢慢退出我們的視野了。
但Spark既然提供三套API,我們到底什麼時候用RDD、什麼時候用DataFrame、或者DataSet呢?我們先來了解下這幾套API。
### RDD
#### RDD的概念
* RDD是Spark 1.1版本開始引入的。
* RDD是Spark的基本資料結構。
* RDD是Spark的彈性分散式資料集,它是不可變的(Immutable)。
* RDD所描述的資料分佈在叢集的各個節點中,基於RDD提供了很多的轉換的並行處理操作。
* RDD具備容錯性,在任何節點上出現了故障,RDD是能夠進行容錯恢復的。
* **RDD專注的是How!**就是如何處理資料,都由我們自己來去各種運算元來實現。
#### 什麼時候使用RDD?
* **應該避免使用RDD!**
#### RDD的短板
* 叢集間通訊都需要將JVM中的物件進行序列化和反序列化,RDD開銷較大
* 頻繁建立和銷燬物件會增加GC,GC的效能開銷較大
![image-20210201231436680](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210201231436.png)
> **Spark 2.0開始,RDD不再是一等公民**
>
> 從Apache Spark 2.0開始,RDD已經被降級為二等公民,RDD已經被棄用了。而且,我們一會就會發現,DataFrame/DataSet是可以和RDD相互轉換的,DataFrame和DataSet也是建立在RDD上。
### DataFrame
#### DataFrame概念
* DataFrame是從Spark 1.3版本開始引入的。
* 通過DataFrame可以簡化Spark程式的開發,讓Spark處理結構化資料變得更簡單。DataFrame可以使用SQL的方式來處理資料。例如:業務分析人員可以基於編寫Spark SQL來進行資料開發,而不僅僅是Spark開發人員。
* DataFrame和RDD有一些共同點,也是不可變的分散式資料集。但與RDD不一樣的是,DataFrame是有schema的,有點類似於關係型資料庫中的**表**,每一行的資料都是一樣的,因為。有了schema,這也表明了DataFrame是比RDD提供更高層次的抽象。
* DataFrame支援各種資料格式的讀取和寫入,例如:CSV、JSON、AVRO、HDFS、Hive表。
* DataFrame使用Catalyst進行優化。
* DataFrame專注的是**What!**,而不是How!
#### DataFrame的優點
* 因為DataFrame是有統一的schema的,所以序列化和反序列無需儲存schema。這樣節省了一定的空間。
* DataFrame儲存在off-heap(堆外記憶體)中,由作業系統直接管理(RDD是JVM管理),可以將資料直接序列化為二進位制存入off-heap中。操作資料也是直接操作off-heap。
#### DataFrane的短板
* DataFrame不是型別安全的
* API也不是面向物件的
### Apache Spark 2.0 統一API
從Spark 2.0開始,DataFrame和DataSet的API合併在一起,實現了跨庫統一成為一套API。這樣,開發人員的學習成本就降低了。只需要學習一個High Level的、型別安全的DataSet API就可以了。——這對於Spark開發人員來說,是一件好事。
上圖我們可以看到,從Spark 2.0開始,Dataset提供了兩組不同特性的API:
* 非型別安全
* 型別安全
其中非型別安全就是DataSet[Row],我們可以對Row中的欄位取別名。這不就是DataFrame嗎?而型別安全就是JVM物件的集合,型別就是scala的樣例類,或者是Java的實體類。
有Spark 2.0原始碼為證:
```scala
package object sql {
// ...
type DataFrame = Dataset[Row]
}
```
https://github.com/IloveZiHan/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/package.scala
也就是說,每當我們用導DataFrame其實就是在使用Dataset。
> 針對Python或者R,不提供型別安全的DataSet,只能基於DataFrame API開發。
#### 什麼時候使用DataFrame
### DataSet
* DataSet是從Spark 1.6版本開始引入的。
* DataSet具有RDD和DataFrame的優點,既提供了更有效率的處理、以及型別安全的API。
* DataSet API都是基於Lambda函式、以及JVM物件來進行開發,所以在編譯期間就可以快速檢測到錯誤,節省開發時間和成本。
* DataSet使用起來很像,但它的執行效率、空間資源效率都要比RDD高很多。可以很方便地使用DataSet處理結構化、和非結構資料。
#### DataSet API的優點
![image-20210201231136055](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210201231136.png)
* DataSet結合了RDD和DataFrame的優點。
* 當序列化資料時,Encoder生成的位元組碼可以直接與堆互動,實現對資料按需訪問,而無需反序列化整個物件。
##### 型別安全
寫過Java或者C#的同學都會知道,一旦在程式碼中型別使用不當,編譯都編譯不過去。日常開發中,我們更多地是使用泛型。因為一旦我們使用非型別安全的型別,軟體的維護週期一長,如果集合中放入了一些不合適的型別,就會出現嚴重的故障。這也是為什麼Java、C#還有C++都要去支援泛型的原因。
在Spark中也會有型別安全的問題。而且,一旦在執行時出現型別安全問題,會影響整個大規模計算作業。這種作業的錯誤排除難度,要比單機故障排查起來更復雜。如果在執行時期間就能發現問題,這很美好啊。
DataFrame中編寫SQL進行資料處理分析,在編譯時是不做檢查的,只有在Spark程式執行起來,才會檢測到問題。
| | SQL | DataFrame | Dataset |
| -------- | ------ | --------- | ------- |
| 語法錯誤 | 執行時 | 編譯時 | 編譯時 |
| 解析錯誤 | 執行時 | 執行時 | 編譯時 |
##### 對結構化和半結構化資料的High Level抽象
例如:我們有一個較大的網站流量日誌JSON資料集,可以很容易的使用DataSet[WebLog]來處理,強型別操作可以讓處理起來更加簡單。
##### 以RDD更易用的API
DataSet引入了更豐富的、更容易使用的API操作。這些操作是基於High Level抽象的,而且基於實體類的操作,例如:進行groupBy、agg、select、sum、avg、filter等操作會容易很多。
##### 效能優化
使用DataFrame和DataSet API在效能和空間使用率上都有大幅地提升。
1. DataFrame和DataSet API是基於Spark SQL引擎之上構建的,會使用Catalyst生成優化後的邏輯和物理執行計劃。尤其是無型別的DataSet[Row](DataFrame),它的速度更快,很適合互動式查詢。
2. 由於Spark能夠理解DataSet中的JVM物件型別,所以Spark會將將JVM物件對映為Tungsten的內部記憶體方式儲存。而Tungsten編碼器可以讓JVM物件更有效地進行序列化和反序列化,生成更緊湊、更有效率的位元組碼。
![RDD儲存效率 VS DataSet儲存效率](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img_2021_01/20210131233908.png)通過上圖可以看到,DataSet的空間儲存效率是RDD的4倍。RDD要使用60GB的空間,而DataSet只需要使用不到15GB就可以了。
## Youtube視訊分析案例
### 資料集
去Kaggle下載youtube地址:
```xml
https://www.kaggle.com/datasnaek/youtube-new?select=USvideos.csv
```
每個欄位的含義都有說明。
### Maven開發環境準備
```xml
```
### RDD開發
```scala
/**
* Spark RDD處理示例
*/
object RddAnalysis {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD Process").setMaster("local[*]")
val sc = new SparkContext(conf)
// 讀取本地檔案建立RDD
val youtubeVideosRDD = {
sc.textFile("""E:\05.git_project\dataset\youtube""")
}
// 統計不同分類Youtube視訊的喜歡人數、不喜歡人數
// 1. 新增行號
// 建立計數器
val rownumAcc = sc.longAccumulator("rownum")
// 帶上行號
youtubeVideosRDD.map(line => {
rownumAcc.add(1)
rownumAcc.value -> line
})
// 過濾掉第一行
.filter(_._1 != 1)
// 去除行號
.map(_._2)
// 過濾掉非法的資料
.filter(line => {
val fields = line.split("\001")
val try1 = scala.util.Try(fields(8).toLong)
val try2 = scala.util.Try(fields(9).toLong)
if(try1.isFailure || try2.isFailure)
false
else
true
})
// 讀取三個欄位(視訊分類、喜歡的人數、不喜歡的人數
.map(line => {
// 按照\001解析CSV
val fields = line.split("\001")
// 取第4個(分類)、第8個(喜歡人數)、第9個(不喜歡人數)
// (分類id, 喜歡人數, 不喜歡人數)
(fields(4), fields(8).toLong, fields(9).toLong)
})
// 按照分類id分組
.groupBy(_._1)
.map(t => {
val result = t._2.reduce((r1, r2) => {
(r1._1, r1._2 + r2._2, r1._3 + r2._3)
})
result
})
.foreach(println)
}
}
```
執行結果如下:
```shell
("BBC Three",8980120,149525)
("Ryan Canty",11715543,80544)
("Al Jazeera English",34427,411)
("FBE",9003314,191819)
("Sugar Pine 7",1399232,81062)
("Rob Scallon",11652652,704748)
("CamilaCabelloVEVO",19077166,1271494)
("Grist",3133,37)
```
程式碼中做了一些資料的過濾,然後進行了分組排序。如果Spark都要這麼來寫的話,業務人員幾乎是沒法寫了。著程式碼完全解釋了How,而不是What。每一個處理的細節,都要我們自己親力親為。實現起來臃腫。
#### 檢視下基於RDD的DAG
開啟瀏覽器,輸入:localhost:4040,來看下DAG。
![image-20210203225714246](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203225714.png)
DAG非常的直觀,按照shuffle分成了兩個Stage來執行。Stage中依次執行了每個Operator。程式沒有經過任何優化。我把每一個操作都和DAG上的節點對應了起來。
### DataFrame開發
```scala
object DataFrameAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Youtube Analysis")
.master("local[*]")
.config("spark.sql.shuffle.partitions",1)
.getOrCreate()
import spark.sqlContext.implicits._
// 讀取CSV
val youtubeVideoDF = spark.read.option("header", true).csv("""E:\05.git_project\dataset\USvideos.csv""")
import org.apache.spark.sql.functions._
// 按照category_id分組聚合
youtubeVideoDF.select($"category_id", $"likes".cast(LongType), $"dislikes".cast(LongType))
.where($"likes".isNotNull)
.where( $"dislikes".isNotNull)
.groupBy($"category_id")
.agg(sum("likes"), sum("dislikes"))
.show()
}
}
```
大家可以看到,現在實現方式非常的簡單,而且清晰。
#### 檢視下基於DataFrame的執行計劃與DAG
![image-20210203230500292](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203230500.png)
但我們執行上面的Spark程式時,其實運行了兩個JOB。
![image-20210203230807773](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203230807.png)
下面這個是第一個Job的DAG。我們看到只有一個Stage。這個DAG我們看得不是特別清楚做了什麼,因為Spark SQL是做過優化的,我們需要檢視Query的詳細資訊,才能看到具體執行的工作。
![image-20210203233802280](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203233802.png)
第一個Job的詳細執行資訊如下:
![image-20210203231757894](https://pick-bed2021.oss-cn-beijing.aliyuncs.com/img/2021Q1/20210203231757.png)
哦,原來這個JOB掃描了所有的行,然後執行了一個Filter過濾操作。再檢視下查詢計劃:
```shell
== Parsed Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
+- Filter (length(trim(value#6, None)) > 0)
+- Project [value#0 AS value#6]
+- Project [value#0]
+- Relation[value#0] text
== Analyzed Logical Plan ==
value: string
GlobalLimit 1
+- LocalLimit 1
+- Filter (length(trim(value#6, None)) > 0)
+- Project [value#0 AS value#6]
+- Project [value#0]
+- Relation[value#0] text
== Optimized Logical Plan ==
GlobalLimit 1
+- LocalLimit 1
+- Filter (length(trim(value#0, None)) > 0)
+- Relation[value#0] text
== Physical Plan ==
CollectLimit 1
+- *(1) Filter (length(trim(value#0, None)) > 0)
+- FileScan text [value#0] Batched: false, DataFilters: [(length(trim(value#0, None)) > 0)], Format: Text, Location: InMemoryFileIndex[file:/E:/05.git_project/dataset/USvideos.csv], PartitionFilters: [], PushedFilters: [], ReadSchema