1. 程式人生 > >Apache Spark 2.0三種API的傳說:RDD、DataFrame和Dataset

Apache Spark 2.0三種API的傳說:RDD、DataFrame和Dataset

sensor json數據 query 答案 內存 table 引擎 library spark

Apache Spark吸引廣大社區開發者的一個重要原因是:Apache Spark提供極其簡單、易用的APIs,支持跨多種語言(比如:Scala、Java、Python和R)來操作大數據。

本文主要講解Apache Spark 2.0中RDD,DataFrame和Dataset三種API;它們各自適合的使用場景;它們的性能和優化;列舉使用DataFrame和DataSet代替RDD的場景。文章大部分聚焦DataFrame和Dataset,因為這是Apache Spark 2.0的API統一的重點。

Apache Spark 2.0統一API的主要動機是:追求簡化Spark。通過減少用戶學習的概念和提供結構化的數據進行處理。除了結構化,Spark也提供higher-level抽象和API作為特定領域語言(DSL)。

彈性數據集(RDD)

RDD是Spark建立之初的核心API。RDD是不可變分布式彈性數據集,在Spark集群中可跨節點分區,並提供分布式low-level API來操作RDD,包括transformation和action。

那什麽時候用RDD呢?

使用RDD的一般場景:

  • 你需要使用low-level的transformation和action來控制你的數據集;
  • 你的數據集非結構化,比如:流媒體或者文本流;
  • 你想使用函數式編程來操作你的數據,而不是用特定領域語言(DSL)表達;
  • 你不在乎schema,比如,當通過名字或者列處理(或訪問)數據屬性不在意列式存儲格式;
  • 你放棄使用DataFrame和Dataset來優化結構化和半結構化數據集。

RDD在Apache Spark 2.0中慘遭拋棄?

你可能會問:RDD是不是成為“二等公民”了?或者是不是幹脆以後不用了? 答案當然是NO! 通過後面的描述你會得知:Spark用戶可以在RDD,DataFrame和Dataset三種數據集之間無縫轉換,而且只需要使用超級簡單的API方法。

DataFrame

DataFrame與RDD相同之處,都是不可變分布式彈性數據集。不同之處在於,DataFrame的數據集都是按指定列存儲,即結構化數據。類似於傳統數據庫中的表。DataFrame的設計是為了讓大數據處理起來更容易。DataFrame允許開發者把結構化數據集導入DataFrame,並做了higher-level的抽象;DataFrame提供特定領域的語言(DSL)API來操作你的數據集。 在Spark2.0中,DataFrame API將會和Dataset API合並,統一數據處理API。由於這個統一“有點急”,導致大部分Spark開發者對Dataset的high-level和type-safe API並沒有什麽概念。
技術分享圖片

Dataset

在Spark 2.0中,Dataset具有兩個完全不同的API特征:強類型API和弱類型API,見下表。DataFrame是特殊的Dataset,其每行是一個弱類型JVM object。相對應地,Dataset是強類型JVM object的集合,通過Scala的case class或者Java class。 強類型API和弱類型API
LanguageMain Abstraction
Scala Dataset[T] & DataFrame (alias for Dataset[Row])
Java Dataset<T>
Python* DataFrame
R* DataFrame
Note:Python和R沒有編譯時type-safety,所以只提供弱類型的API:DataFrame。

Dataset API的優勢

對於Spark開發者而言,你將從Spark 2.0的DataFrame和Dataset統一的API獲得以下好處: 1. 靜態類型和運行時類型安全 考慮靜態類型和運行時類型安全,SQL有很少的限制而Dataset限制很多。例如,Spark SQL查詢語句,你直到運行時才能發現語法錯誤(syntax error),代價較大。然後DataFrame和Dataset在編譯時就可捕捉到錯誤,節約開發時間和成本。 Dataset API都是lambda函數和JVM typed object,任何typed-parameters不匹配即會在編譯階段報錯。因此使用Dataset節約開發時間。
技術分享圖片
2. High-level抽象以及結構化和半結構化數據集的自定義視圖 DataFrame是Dataset[Row]的特例,把結構化數據集視圖用於半結構化數據集。例如,有個海量IoT設備事件數據集,用JSON格式表示。JSON是一個半結構化數據格式,這裏可以自定義一個Dataset:Dataset[DeviceIoTData]。
    {  
        "device_id": 198164,  
        "device_name": "sensor-pad-198164owomcJZ",  
        "ip": "80.55.20.25",  
        "cca2": "PL",  
        "cca3": "POL",  
        "cn": "Poland",  
        "latitude": 53.08,  
        "longitude": 18.62,  
        "scale": "Celsius",  
        "temp": 21,  
        "humidity": 65,  
        "battery_level": 8,  
        "c02_level": 1408,  
        "lcd": "red",  
        "timestamp": 1458081226051  
    }  
用Scala為JSON數據DeviceIoTData定義case class。
    case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String,   
    cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long,   
    ip: String, latitude: Double, lcd: String, longitude: Double,   
    scale:String, temp: Long, timestamp: Long)  
緊接著,從JSON文件讀取數據
    // read the json file and create the dataset from the   
    // case class DeviceIoTData  
    // ds is now a collection of JVM Scala objects DeviceIoTData  
      
    val ds = spark.read.json("/databricks-public-datasets/data/iot/iot_devices.json").as[DeviceIoTData]  
這個時候有三個事情會發生:
  • Spark讀取JSON文件,推斷出其schema,創建一個DataFrame;
  • Spark把數據集轉換DataFrame -> Dataset[Row],泛型Row object,因為這時還不知道其確切類型;
  • Spark進行轉換:Dataset[Row] -> Dataset[DeviceIoTData],DeviceIoTData類的Scala JVM object。
3. 簡單易用的API 雖然結構化數據會給Spark程序操作數據集帶來挺多限制,但它卻引進了豐富的語義和易用的特定領域語言。大部分計算可以被Dataset的high-level API所支持。例如,簡單的操作agg,select,avg,map,filter或者groupBy即可訪問DeviceIoTData類型的Dataset。 使用特定領域語言API進行計算是非常簡單的。例如,使用filter()map()創建另一個Dataset。
    // Use filter(), map(), groupBy() country, and compute avg()   
    // for temperatures and humidity. This operation results in   
    // another immutable Dataset. The query is simpler to read,   
    // and expressive  
      
     val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()   
      
    //display the resulting dataset  
      
    display(dsAvgTmp)  

4. 性能和優化

使用DataFrame和Dataset API獲得空間效率和性能優化的兩個原因:

首先,DataFrame和Dataset API是建立在Spark SQL引擎之上,它會使用Catalyst優化器來生成優化過的邏輯計劃和物理查詢計劃。R,Java,Scala或者Python的DataFrame/Dataset API使得查詢都進行相同的代碼優化以及空間和速度的效率提升。

技術分享圖片

其次,Spark作為編譯器可以理解Dataset類型的JVM object,它能映射特定類型的JVM object到Tungsten內存管理,使用Encoder。Tungsten的Encoder可以有效的序列化/反序列化JVM object,生成字節碼來提高執行速度。

什麽時候使用DataFrame或者Dataset?

  • 你想使用豐富的語義,high-level抽象,和特定領域語言API,那你可以使用DataFrame或者Dataset;
  • 你處理的半結構化數據集需要high-level表達,filter,map,aggregation,average,sum,SQL查詢,列式訪問和使用lambda函數,那你可以使用DataFrame或者Dataset;
  • 你想利用編譯時高度的type-safety,Catalyst優化和Tungsten的code生成,那你可以使用DataFrame或者Dataset;
  • 你想統一和簡化API使用跨Spark的Library,那你可以使用DataFrame或者Dataset;
  • 如果你是一個R使用者,那你可以使用DataFrame或者Dataset;
  • 如果你是一個Python使用者,那你可以使用DataFrame或者Dataset。
你可以無縫地把DataFrame或者Dataset轉化成一個RDD,只需簡單的調用.rdd:
    // select specific fields from the Dataset, apply a predicate  
    // using the where() method, convert to an RDD, and show first 10  
    // RDD rows  
      
    val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300)  
    // convert to RDDs and take the first 10 rows  
      
    val eventsRDD = deviceEventsDS.rdd.take(10)  

總結

通過上面的分析,什麽情況選擇RDD,DataFrame還是Dataset已經很明顯了。RDD適合需要low-level函數式編程和操作數據集的情況;DataFrame和Dataset適合結構化數據集,使用high-level和特定領域語言(DSL)編程,空間效率高和速度快。


Apache Spark 2.0三種API的傳說:RDD、DataFrame和Dataset