1. 程式人生 > 其它 >Spark權威指南(中文版)----第11章 Datasets(1)

Spark權威指南(中文版)----第11章 Datasets(1)

Datasets是結構化api的基本型別。我們已經使用過DataFrames,它是Row型別的Datasets,可以跨Spark的不同語言使用。Datasets是一種嚴格意義上的Java虛擬機器(JVM)語言特性,僅適用於Scala和Java。使用Datasets,您可以定義資料集中每行包含的物件。在Scala中,這將是一個case類物件,它本質上定義了一個您可以使用的模式,在Java中,您將定義一個Java Bean。有經驗的使用者經常將Datasets稱為Spark中的“型別化api集”。有關更多資訊,請參見第4章。在第4章中,我們討論了Spark的型別,如StringType、BigIntType、StructType等。這些特定於Spark的型別對映到每個Spark語言(如String、Integer和Double)中可用的型別。當您使用DataFrame API時,您不建立strings或integers,而是通過操作Row物件來為您操作資料。事實上,如果您使用Scala或Java,所有“DataFrames”實際上都是Row型別的Datasets。為了有效地支援特定領域的物件,需要一個稱為“Encoder”的特殊概念。編碼器將特定於域的型別T對映到Spark的內部型別系統。例如,給定一個Person類有兩個欄位,name (string)和age (int),編碼器指導Spark在執行時生成程式碼,將Person物件序列化為二進位制結構。當使用DataFrames或“標準”結構化api時,這個二進位制結構將是Row。當我們想要建立我們自己的特定於域的物件時,我們在Scala中指定一個case class,或者在Java中指定一個JavaBean。Spark將允許我們以分散式方式操作這個物件(代替Row)。當您使用Dataset API時,對於它所觸及的每一行,域指定型別,Spark將Spark行格式轉換為您指定的物件(case類或Java類)。這種轉換會減慢操作速度,但可以提供更大的靈活性。您將注意到效能上的下降,但這與您在Python中看到的使用者定義函式(UDF)之類的東西的量級大不相同,因為效能成本不像切換程式語言那樣極端,但是要記住這一點。

11.1.什麼時候使用Datasets

您可能會想,如果我在使用Datasets時要付出效能代價,為什麼還要使用它們呢?如果我們必須把原因壓縮成一個標準列表,這裡有幾個原因:

  • 當您希望執行的操作無法使用DataFrame操作來表示時

  • 當您希望或需要型別安全,並且願意接受效能成本來實現它時

讓我們更詳細地探討這些。有一些操作不能使用我們在前幾章中看到的結構化api來表示。雖然這些不是特別常見,但是您可能希望將大量的業務邏輯編碼到一個特定的函式中,而不是用SQL或DataFrames。這是Datasets的適當用法。此外,Dataset API是型別安全的。對其型別無效的操作(例如兩個字串型別相減)將在編譯時而不是執行時失敗。如果正確性和型別安全程式碼是您的最高優先順序,那麼以犧牲一些效能為代價,這對您來說可能是一個很好的選擇。這並不能保護您免受畸形資料的影響,但是可以讓您更優雅地處理和組織資料。您可能希望使用Datasets的另一個潛在時間是,您希望重用單節點工作負載和Spark工作負載之間對整個行進行的各種轉換。如果您對Scala有一些經驗,您可能會注意到Spark的api反映了Scala序列型別,但是它們是以分散式方式執行的。事實上,Scala的發明者馬丁·奧德斯基(Martin Odersky)在2015年的Spark歐洲峰會上就說過這句話。因此,使用Datasets的一個優點是,如果將所有資料和轉換定義為可接受case classes,那麼在分散式和本地工作模式中重用它們就很簡單了。此外,當您將DataFrames收集到本地磁碟時,它們將是正確的類和型別,有時會使進一步的操作更加容易。可能最流行的用例是同時使用DataFrames和Datasets,在與工作負載最相關的效能和型別安全之間進行手動權衡。當您想要收集資料到驅動程式並使用單節點庫操作它時,可能處於一個大型的、基於DataFrames的提取、轉換和載入(ETL)轉換的末尾,或者,當您需要在Spark SQL中執行過濾和進一步操作之前執行逐行解析時,它可能處於轉換的開始。

11.2.建立Datasets

建立Datasets有點像手工操作,需要您提前知道並定義模式schemas。

11.2.1.In Java: Encoders

JavaEncoders相當簡單,您只需指定您的類,然後當您遇到DataFrame(Dataset<Row>)時,您將對其進行編碼:

11.2.2.In Scala: Case Classes

要在Scala中建立Datasets,需要定義一個Scala case class。case class是一個常規類,它具有以下特徵:

  • 不可變的

  • 可通過模式匹配進行分解

  • 允許基於結構而不是引用進行比較

  • 易於使用和操作

這些特性使得它對於資料分析非常有價值,因為對caseclass進行推理非常容易。可能最重要的特性是caseclass是不可變的,並且允許按結構而不是按值進行比較。Scala文件是這樣描述caseclass的:

  • 不可變性使您無需跟蹤事物在何時何地發生了變化

  • 按值比較允許您將例項作為原始值進行比較—不再存在關於類的例項是否按值或引用進行比較的不確定性

  • 模式匹配簡化了分支邏輯,從而減少了錯誤,提高了程式碼的可讀性。

這些優點也在Spark中得到了應用。

要開始建立Dataset,讓我們為其中一個數據集定義一個case class:

現在我們定義了一個case class,它將表示資料集中的一條記錄。更簡潔地說,我們現在有了一個飛行Dataset。這沒有為我們定義任何方法,只是模式。當我們讀取資料時,我們會得到一個DataFrame。然而,我們只是使用as方法將其轉換為我們指定的行型別:

11.3.Actions

儘管我們可以看到Dataset的強大功能,但重要的是要理解像collect、take和count這樣的操作是否適用於我們正在使用的資料集或資料流:

您還會注意到,當我們實際訪問其中一個case class時,我們不需要進行任何型別強制,我們只需要指定case class的屬性名稱並返回,不僅是期望值,還有期望的型別:

11.4.Transformations

Dataset上的轉換與我們在DataFrames上看到的轉換相同。您在本節中讀到的任何轉換都適用於Dataset,我們鼓勵您瀏覽相關聚合或連線的特定部分。除了這些轉換之外,資料集還允許我們指定比單獨在DataFrames上執行的更復雜和強型別的轉換,因為我們要操作原始Java虛擬機器(JVM)型別。為了演示這個原始物件操作,讓我們過濾剛剛建立的Dataset。

11.4.1.Filtering

讓我們看一個簡單的例子,建立一個簡單的函式,該函式接受一個Flight並返回一個布林值,該值描述起點和終點是否相同。這不是一個UDF(至少以Spark SQL定義UDF的方式),而是一個泛型函式。

提示

在下面的示例中,您將注意到我們將建立一個函式來定義這個過濾器。這與我們在書中迄今所做的工作有很大的不同。通過指定一個函式,我們強制Spark對Dataset中的每一行計算這個函式的值。這可能是非常資源密集型的。對於簡單的過濾器,總是首選編寫SQL表示式。這將大大降低過濾資料的成本,同時仍然允許您稍後將其作為Dataset進行操作:

我們現在可以將這個函式傳遞到filter方法中,指定對於每一行,它都應該驗證這個函式返回true,並且在這個過程中會相應地過濾我們的Dataset:

結果是:

正如我們前面看到的,這個函式根本不需要在Spark程式碼中執行。與udf類似,我們可以在Spark中使用它之前,使用它並在本地機器上的資料上測試它。

例如,這個資料集足夠小,我們可以收集到驅動程式(作為一個Flights陣列),我們可以在上面操作和執行完全相同的過濾操作:

結果是:

我們可以看到我們得到了和之前完全一樣的答案。

11.4.2.Mapping

Filtering是一個簡單的轉換,但有時需要將一個值對映到另一個值。在前面的示例中,我們對函式執行了這個操作:它接受一個flight並返回一個Boolean值,但在其他時候,我們可能需要執行一些更復雜的操作,比如提取一個值、比較一組值或類似的操作。

最簡單的例子是操作Dataset,以便從每一行中提取一個值。這可以在DataFrame上有效地執行,比如對Dataset進行select。讓我們提取目的地:

注意,最後得到的資料集型別為String。這是因為Spark已經知道這個結果應該返回的JVM型別,並且允許我們從編譯時檢查中獲益,如果由於某種原因,它是無效的。我們可以收集這個,並得到一個字串陣列到Driver程式:

這可能感覺微不足道,沒有必要;我們可以在DataFrames上正確地完成大部分工作。事實上,我們建議您這樣做,因為這樣做可以獲得很多好處。您將獲得一些優勢,比如程式碼生成,這對於任意使用者定義的函式都是不可能的。然而,這對於更復雜的逐行操作非常有用。

11.5.Joins

如前所述,連線的應用與它們對DataFrames的應用相同。然而,Datasets還提供了更復雜的方法joinWith。joinWith大致相當於一個co-group(在RDD術語中),您基本上會在一個Datasets中得到兩個巢狀Datasets。每一列表示一個Datasets,可以相應地操作這些資料集。當您需要在連線中維護更多資訊或對整個結果執行一些更復雜的操作(如高階對映或過濾器)時,這將非常有用。

讓我們建立一個航班元資料集來演示joinWith:

注意,我們最終得到了一個鍵值對的資料集,其中每一行表示一個航班和航班元資料。當然,我們可以將它們作為Datasets或具有複雜型別的DataFrame查詢:

我們可以像以前一樣收集它們:

當然,“常規”連線也可以很好地工作,儘管您會注意到,在本例中,我們最終得到了一個DataFrame(因此丟失了JVM型別資訊)。

我們總是可以定義另一個Dataset來重新獲得它。同樣重要的是,DataFrame和Dataset進行join沒有任何問題——我們最終得到了相同的結果:

11.6.Grouping和Aggregations

分組和聚合遵循與我們在前面的聚合章節中看到的相同的基本標準,所以groupBy rollup和cube仍然適用,但是這些返回的是DataFrame,而不是Dataset (您丟失了型別資訊):

這通常沒什麼大不了的,但是如果您想保留型別資訊,可以執行其他分組和聚合。groupByKey方法就是一個很好的例子。這允許您根據資料集中的Dataset進行分組,並返回一個型別化Dataset。但是,這個函式不接受特定的列名,而是接受一個函式。這使得您可以指定更復雜的分組函式,這些函式更類似於以下內容:

儘管這提供了靈活性,但這是一種折衷,因為現在我們引入了JVM型別和不能由Spark優化的函式。這意味著您將看到效能差異,我們可以在檢查explain計劃時看到這一點。在下面的程式碼中,你可以看到我們在DataFrame中添加了一個新的列(我們函式的結果),然後對它執行分組:

當我們對一個Dataset執行一個key值的分組後,我們可以對key Value Dataset進行操作,key Value Dataset的函式會將分組操作為原始物件:

我們甚至可以建立新的操作,並定義如何減少組:

這應該是足夠直接的理解,這是一個更昂貴的過程後立即聚合掃描,特別是因為它最終在相同的最終結果:

這應該只在使用者定義編碼和有意義的地方使用Datasets。這可能是大資料管道的起點,也可能是終點。

11.7.結束語

在本章中,我們介紹了資料集的基礎知識,並提供了一些令人鼓舞的示例。雖然篇幅很短,但這一章實際上教會了你所有你需要知道的關於資料集的基本知識,以及如何使用它們。將它們看作是高階結構化api和底層RDD api之間的混合是有幫助的,這是第12章的主題。