Apache Spark 2.0三種API的傳說:RDD、DataFrame和Dataset
Apache Spark吸引廣大社區開發者的一個重要原因是:Apache Spark提供極其簡單、易用的APIs,支持跨多種語言(比如:Scala、Java、Python和R)來操作大數據。
本文主要講解Apache Spark 2.0中RDD,DataFrame和Dataset三種API;它們各自適合的使用場景;它們的性能和優化;列舉使用DataFrame和DataSet代替RDD的場景。文章大部分聚焦DataFrame和Dataset,因為這是Apache Spark 2.0的API統一的重點。
Apache Spark 2.0統一API的主要動機是:追求簡化Spark。通過減少用戶學習的概念和提供結構化的數據進行處理。除了結構化,Spark也提供higher-level抽象和API作為特定領域語言(DSL)。
彈性數據集(RDD)
RDD是Spark建立之初的核心API。RDD是不可變分布式彈性數據集,在Spark集群中可跨節點分區,並提供分布式low-level API來操作RDD,包括transformation和action。
那什麽時候用RDD呢?
使用RDD的一般場景:
- 你需要使用low-level的transformation和action來控制你的數據集;
- 你的數據集非結構化,比如:流媒體或者文本流;
- 你想使用函數式編程來操作你的數據,而不是用特定領域語言(DSL)表達;
- 你不在乎schema,比如,當通過名字或者列處理(或訪問)數據屬性不在意列式存儲格式;
- 你放棄使用DataFrame和Dataset來優化結構化和半結構化數據集。
RDD在Apache Spark 2.0中慘遭拋棄?
你可能會問:RDD是不是成為“二等公民”了?或者是不是幹脆以後不用了? 答案當然是NO! 通過後面的描述你會得知:Spark用戶可以在RDD,DataFrame和Dataset三種數據集之間無縫轉換,而且只需要使用超級簡單的API方法。DataFrame
DataFrame與RDD相同之處,都是不可變分布式彈性數據集。不同之處在於,DataFrame的數據集都是按指定列存儲,即結構化數據。類似於傳統數據庫中的表。DataFrame的設計是為了讓大數據處理起來更容易。DataFrame允許開發者把結構化數據集導入DataFrame,並做了higher-level的抽象;DataFrame提供特定領域的語言(DSL)API來操作你的數據集。 在Spark2.0中,DataFrame API將會和Dataset API合並,統一數據處理API。由於這個統一“有點急”,導致大部分Spark開發者對Dataset的high-level和type-safe API並沒有什麽概念。Dataset
在Spark 2.0中,Dataset具有兩個完全不同的API特征:強類型API和弱類型API,見下表。DataFrame是特殊的Dataset,其每行是一個弱類型JVM object。相對應地,Dataset是強類型JVM object的集合,通過Scala的case class或者Java class。 強類型API和弱類型APILanguage | Main Abstraction |
---|---|
Scala | Dataset[T] & DataFrame (alias for Dataset[Row]) |
Java | Dataset<T> |
Python* | DataFrame |
R* | DataFrame |
Dataset API的優勢
對於Spark開發者而言,你將從Spark 2.0的DataFrame和Dataset統一的API獲得以下好處: 1. 靜態類型和運行時類型安全 考慮靜態類型和運行時類型安全,SQL有很少的限制而Dataset限制很多。例如,Spark SQL查詢語句,你直到運行時才能發現語法錯誤(syntax error),代價較大。然後DataFrame和Dataset在編譯時就可捕捉到錯誤,節約開發時間和成本。 Dataset API都是lambda函數和JVM typed object,任何typed-parameters不匹配即會在編譯階段報錯。因此使用Dataset節約開發時間。2. High-level抽象以及結構化和半結構化數據集的自定義視圖 DataFrame是Dataset[Row]的特例,把結構化數據集視圖用於半結構化數據集。例如,有個海量IoT設備事件數據集,用JSON格式表示。JSON是一個半結構化數據格式,這裏可以自定義一個Dataset:Dataset[DeviceIoTData]。
{ "device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude": 53.08, "longitude": 18.62, "scale": "Celsius", "temp": 21, "humidity": 65, "battery_level": 8, "c02_level": 1408, "lcd": "red", "timestamp": 1458081226051 }用Scala為JSON數據DeviceIoTData定義case class。
case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long, ip: String, latitude: Double, lcd: String, longitude: Double, scale:String, temp: Long, timestamp: Long)緊接著,從JSON文件讀取數據
// read the json file and create the dataset from the // case class DeviceIoTData // ds is now a collection of JVM Scala objects DeviceIoTData val ds = spark.read.json("/databricks-public-datasets/data/iot/iot_devices.json").as[DeviceIoTData]這個時候有三個事情會發生:
- Spark讀取JSON文件,推斷出其schema,創建一個DataFrame;
- Spark把數據集轉換DataFrame -> Dataset[Row],泛型Row object,因為這時還不知道其確切類型;
- Spark進行轉換:Dataset[Row] -> Dataset[DeviceIoTData],DeviceIoTData類的Scala JVM object。
// Use filter(), map(), groupBy() country, and compute avg() // for temperatures and humidity. This operation results in // another immutable Dataset. The query is simpler to read, // and expressive val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg() //display the resulting dataset display(dsAvgTmp)
4. 性能和優化
使用DataFrame和Dataset API獲得空間效率和性能優化的兩個原因:
首先,DataFrame和Dataset API是建立在Spark SQL引擎之上,它會使用Catalyst優化器來生成優化過的邏輯計劃和物理查詢計劃。R,Java,Scala或者Python的DataFrame/Dataset API使得查詢都進行相同的代碼優化以及空間和速度的效率提升。
其次,Spark作為編譯器可以理解Dataset類型的JVM object,它能映射特定類型的JVM
object到Tungsten內存管理,使用Encoder。Tungsten的Encoder可以有效的序列化/反序列化JVM
object,生成字節碼來提高執行速度。
什麽時候使用DataFrame或者Dataset?
- 你想使用豐富的語義,high-level抽象,和特定領域語言API,那你可以使用DataFrame或者Dataset;
- 你處理的半結構化數據集需要high-level表達,filter,map,aggregation,average,sum,SQL查詢,列式訪問和使用lambda函數,那你可以使用DataFrame或者Dataset;
- 你想利用編譯時高度的type-safety,Catalyst優化和Tungsten的code生成,那你可以使用DataFrame或者Dataset;
- 你想統一和簡化API使用跨Spark的Library,那你可以使用DataFrame或者Dataset;
- 如果你是一個R使用者,那你可以使用DataFrame或者Dataset;
- 如果你是一個Python使用者,那你可以使用DataFrame或者Dataset。
// select specific fields from the Dataset, apply a predicate // using the where() method, convert to an RDD, and show first 10 // RDD rows val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300) // convert to RDDs and take the first 10 rows val eventsRDD = deviceEventsDS.rdd.take(10)
總結
通過上面的分析,什麽情況選擇RDD,DataFrame還是Dataset已經很明顯了。RDD適合需要low-level函數式編程和操作數據集的情況;DataFrame和Dataset適合結構化數據集,使用high-level和特定領域語言(DSL)編程,空間效率高和速度快。
Apache Spark 2.0三種API的傳說:RDD、DataFrame和Dataset