1. 程式人生 > 其它 >Spark權威指南(中文版)----第13章 高階RDD操作

Spark權威指南(中文版)----第13章 高階RDD操作

第12章探討了單一RDD操作的基礎。您學習瞭如何建立RDDs以及為什麼要使用它們。此外,我們還討論了map、filter、reduce以及如何建立函式來轉換單個RDD資料。本章將介紹高階的RDD操作,並關注鍵值RDDs,這是一種用於操作資料的強大抽象。我們還討論了一些更高階的主題,比如自定義分割槽,這是您可能希望首先使用RDDs的原因。通過使用自定義分割槽函式,您可以精確地控制資料如何在叢集中佈局,並相應地操作該分割槽。在我們開始之前,讓我們總結一下我們將要討論的主要話題:

  • 聚合和鍵值RDD
  • 自定義分割槽
  • RDD join操作

讓我們使用我們在上一章使用的資料集:

// in Scalaval myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"  .split(" ")
valwords=spark.sparkContext.parallelize(myCollection,2)

13.1.Key-Value RDDs

在RDDs上有許多方法(…ByKey結尾的方法)要求您將資料以鍵值格式表示。每當您在方法名稱中看到ByKey時,就意味著您只能在PairRDD型別上執行此操作。最簡單的方法是將當前的RDD對映到一個基本的鍵值結構。這意味著在RDD的每個記錄中有兩個值:

// in Scalawords.map(word=>(word.toLowerCase,1))
13.1.1.keyBy

前面的示例演示了建立key的簡單方法。但是,您還可以使用keyBy函式來實現相同的結果,指定一個從當前值建立key的函式。在本例中,你是在用單詞中的第一個字母作為key,將原始記錄作為key對應的值。

// in Scalavalkeyword=words.keyBy(word=>word.toLowerCase.toSeq(0).toString)
13.1.2.mapValues

在您擁有一組鍵值對之後,就可以開始操作它們了。如果我們有一個元組tuple,Spark會假設第一個元素是鍵,第二個元素是值。在這種格式下,您可以顯式地選擇map-over值(並忽略單個鍵)。當然,您可以手動進行操作,但是當您知道您要修改這些值時,mapValues可以幫助你防止錯誤:

// in Scalakeyword.mapValues(word=>word.toUpperCase).collect()
13.1.3.提取key和value

當我們使用鍵值對格式時,我們還可以使用以下方法提取的鍵或值:

// in Scalakeyword.keys.collect()keyword.values.collect()
13.1.4.lookup

您可能想要使用RDD的一個有趣的任務是查詢特定鍵的結果。如果我們查詢“s”,我們將得到與“Spark”和“Simple”相關的兩個值:

keyword.lookup("s").foreach(println)

13.2.Aggregations聚合

您可以在普通的RDDs或PairRDDs上執行聚合,這取決於您使用的方法。讓我們用我們的一些資料集來證明這一點:

// in Scalaval chars = words.flatMap(word => word.toLowerCase.toSeq)val KVcharacters = chars.map(letter => (letter, 1))def maxFunc(left:Int, right:Int) = math.max(left, right)def addFunc(left:Int, right:Int) = left + rightvalnums=sc.parallelize(1to30,5)

在您做完準備工作之後,您可以做一些類似countByKey的事情,它計算每個鍵值資料。

13.2.1.countByKey

您可以計算每個鍵的元素數量,將結果收集到本地map中。您還可以使用一個近似方法來實現這一點,這使得您可以在使用Scala或Java時指定一個超時和信心值:

// in Scalaval timeout = 1000L //millisecondsval confidence = 0.95KVcharacters.countByKey()KVcharacters.countByKeyApprox(timeout,confidence)
13.2.2.理解聚合的實現方式

有幾種方法可以建立鍵值PairRDDs。然而,這個實現對於工作穩定性來說非常重要。讓我們比較兩個基本的選擇,groupBy和reduce。

13.2.2.1.groupByKey

檢視API文件,您可能會認為groupByKey與每個分組的對映是總結每個鍵的計數的最佳方法:

// in ScalaKVcharacters.groupByKey().map(row=>(row._1,row._2.reduce(addFunc))).collect()

然而,對於大多數情況來說,這是解決問題的錯誤方法。這裡的基本問題是,在將函式應用到它們之前,每個執行器必須在記憶體中保留給定鍵的所有值。為什麼這個有問題?如果您有大量的鍵傾斜,一些分割槽可能會被一個給定鍵的大量值完全超載,您將會得到outofmemoryerror。這顯然不會引起我們當前資料集的問題,但是它會導致嚴重的問題。這不一定會發生,但會發生。

當groupByKey有意義時,就會出現用例。如果每個鍵的值都是一致的,並且知道它們將適合於給定的執行器,那麼您將會很好。當你這樣做的時候,很好地知道你到底在做什麼。新增用例的首選方法是:reduceByKey。

13.2.2.2.reduceByKey

因為我們執行的是一個簡單的計數,更穩定的方法是執行相同的flatMap,然後執行map將每個字母例項對映到數字1,然後使用求和函式執行一個reduceByKey來收集陣列。這個實現更穩定,因為reduce發生在每個分割槽中,不需要將所有東西都放在記憶體中。這大大提高了操作的速度和操作的穩定性:

KVcharacters.reduceByKey(addFunc).collect()

reduceByKey方法返回一個組(鍵)的RDD,並返回沒有進行排序的元素序列。

13.2.3. 其他聚合方法

我們發現,在當今的Spark中,使用者遇到這種工作負載(或需要執行這種操作)的情況非常少見。當您可以使用結構化api執行更簡單的聚合時,使用這些極低級別工具的理由並不多。這些函式基本上允許您非常具體地、非常低階地控制在機器叢集上如何執行給定的聚合。

13.2.3.1.aggregate

這個函式需要一個null和start起始值,然後要求您指定兩個不同的函式。在分割槽內的第一個聚合,跨分割槽的第二個聚合。開始值將用於兩個聚合級別:

// in Scalanums.aggregate(0)(maxFunc,addFunc)

此方法確實具有一些效能影響,因為它在driver端執行最終的聚合。如果從executors端返回到driver端的結果集太大,會導致driver端記憶體溢位。此時可以使用treeAggregate。它與aggregate (在使用者級)做相同的事情,但以另一種方式進行。它基本上是“下推”一些子聚合(從executor到executor建立樹),然後在驅動程式上執行最終的聚合。擁有多個級別可以幫助您確保在聚合過程中驅動程式不會耗盡記憶體。

// in Scalaval depth = 3nums.treeAggregate(0)(maxFunc,addFunc,depth)
13.2.3.2.aggregateByKey

這個函式和aggregate函式一樣,但是它不是按分割槽進行分割槽,而是按鍵執行。開始值和函式遵循相同的屬性:

/ /in ScalaKVcharacters.aggregateByKey(0)(addFunc,maxFunc).collect()
13.2.3.3.combineByKey

您可以指定一個組合器,而不是指定一個聚合函式。這個組合在給定的鍵上操作,並根據某個函式合併值。然後合併不同的組合輸出結果。我們還可以將輸出分割槽的數量指定為自定義輸出分割槽器:

// in Scalaval valToCombiner = (value:Int) => List(value)val mergeValuesFunc = (vals:List[Int], valToAppend:Int) => valToAppend :: valsval mergeCombinerFunc = (vals1:List[Int], vals2:List[Int]) => vals1 ::: vals2// now we define these as function variablesval outputPartitions = 6KVcharacters  .combineByKey(    valToCombiner,    mergeValuesFunc,    mergeCombinerFunc,    outputPartitions).collect()

13.3. CoGroup

CoGroup操作可以用來合併三個key-value RDD(Scala語言)。按key值合併。這實際上只是一個基於分組的RDD連線。

// in Scalaimport scala.util.Randomval distinctChars = words.flatMap(word => word.toLowerCase.toSeq).distinctval charRDD = distinctChars.map(c => (c, new Random().nextDouble()))val charRDD2 = distinctChars.map(c => (c, new Random().nextDouble()))val charRDD3 = distinctChars.map(c => (c, new Random().nextDouble()))charRDD.cogroup(charRDD2,charRDD3).take(5)

13.4.Joins

RDDs與我們在結構化API中看到的連線非常相似,儘管RDD的join,你可以進行更細粒度的控制。它們都遵循相同的基本格式:

  • 我們想要連線的兩個RDDs,
  • 它們應該輸出的輸出分割槽數量(可選)
  • 自定義分割槽函式(可選)

我們將在本章後面討論分割槽函式。

13.4.1. Inner Join

現在我們將演示一個內部連線。注意我們如何設定我們希望看到的輸出分割槽的數量:

// in Scalaval keyedChars = distinctChars.map(c => (c, new Random().nextDouble()))val outputPartitions = 10KVcharacters.join(keyedChars).count()KVcharacters.join(keyedChars,outputPartitions).count()

我們不會為其他連線提供一個示例,但是它們都遵循相同的基本格式。您可以在第8章的概念級別瞭解以下連線型別:

  • fullOuterJoin
  • leftOuterJoin
  • rightOuterJoin
  • Cartesian(這又是非常危險的!它不接受連線鍵,可以有一個巨大的輸出。
13.4.2. zips

最後一種連線實際上並不是一個連線,但是它確實合併了兩個RDDs,所以把它標記為連線是值得的。zip可以讓你“zip”兩個RDDs,假設它們的長度相同。這將建立一個PairRDD。兩個RDDs必須具有相同數量的分割槽和相同數量的元素:

// in Scalaval numRange = sc.parallelize(0 to 9, 2)words.zip(numRange).collect()# in PythonnumRange = sc.parallelize(range(10), 2)words.zip(numRange).collect()

這給了我們以下的結果,一個key-value陣列:

13.5. 分割槽控制(未完待續......)

使用RDDs,您可以控制資料在叢集中的物理分佈方式。其中一些方法與我們在結構化api中擁有的方法基本相同,但是關鍵的附加功能(結構化api中不存在)是指定分割槽函式的能力(正式的自定義Partitioner,稍後我們將討論基本方法)。

13.5.1.coalesce

coalesce有效地合併了同一worker上的分割槽,會避免重新分割槽時資料的shuffle。例如,我們的words RDD目前是兩個分割槽,我們可以使用coalesce將其壓縮為一個分割槽,而不會導致資料的shuffle:

13.5.2.repartition

repartition操作允許增加或減少重新資料分割槽,但在過程中執行跨節點的shuffle。在map和filter型別操作中,增加分割槽的數量可以提高並行度:

13.5.3.repartitionAndSortWithinPartitions

該操作使您能夠重新分割槽,並指定每個輸出分割槽的順序。我們將省略這個示例,因為它的文件很好,但是分割槽和key的比較邏輯都可以由使用者指定。

13.5.4. 自定義分割槽

這種能力是您希望使用RDDs的主要原因之一。自定義分割槽器在結構化api中不可用,因為它們實際上沒有邏輯對應項。它們是低層的實現細節,對作業能否成功執行具有重要影響。為這個操作激發自定義分割槽的典型例子是PageRank,我們試圖控制叢集上資料的佈局並避免shuffle。在我們的購物資料集中,這可能意味著按每個客戶ID進行分割槽(稍後我們將討論這個示例)。

簡而言之,自定義分割槽的唯一目標是均勻分佈叢集中的資料,以便解決資料傾斜等問題。

如果要使用自定義分割槽器,應該從結構化api下拉到RDDs,應用自定義分割槽器,然後將其轉換回DataFrame或Dataset。通過這種方式,您可以同時獲得兩個方面的優勢,只需要在需要的時候使用自定義分割槽。

要執行自定義分割槽,您需要實現自己的繼承Partitioner的類。只有當您對業務問題有大量的領域知識時才需要這樣做—如果您只是希望對一個值甚至一組值(列)進行分割槽,那麼在DataFrame API中進行分割槽是值得的。

讓我們來看一個例子:

Spark有兩個內建的分割槽器,您可以在RDD API中利用它們,一個用於離散值的HashPartitioner和一個RangePartitioner。這兩種方法分別適用於離散值和連續值。Spark的結構化api已經使用了這些,儘管我們可以在RDDs中使用相同的東西:

雖然雜湊和範圍分割槽器很有用,但它們還相當初級。有時,您需要執行一些非常底層的分割槽,因為您要處理非常大的資料和很大的key傾斜。key傾斜意味著一些key的值比其他key的值要多得多。您希望儘可能地打亂這些key,以提高並行性,並防止在執行過程中出現OutOfMemoryErrors。

一個例項可能是,當且僅當key匹配某種格式時,需要對更多key進行分割槽。例如,我們可能知道在您的資料集中有兩個客戶總是使分析變慢,我們需要比其他客戶id更深入地分解它們。事實上,這兩個客戶id存在嚴重的資料傾斜,需要單獨處理,而其他所有客戶id可以歸到大的分組。這顯然是一個有點誇張的例子,但是您也可能在您的資料中看到類似的情況:

執行此操作之後,您將看到每個分割槽中的結果計數。後兩個數字會有所不同,因為我們是隨機分佈的(當我們在Python中做同樣的操作時,您將看到),但是同樣的原則也適用:

此自定義key分發邏輯僅在RDD級別可用。當然,這是一個簡單的例子,但是它確實顯示了使用任意邏輯以物理方式在叢集周圍分佈資料的強大功能。

13.6.CustomSerialization自定義序列化器

最後一個值得討論的高階主題是Kryo序列化問題。您希望並行化(或函式)的任何物件都必須是可序列化的:

預設的序列化可能非常慢。Spark可以使用Kryo庫(版本2)更快地序列化物件。Kryo比Java序列化(通常高達10x)要快得多,壓縮率更高,但是它不支援所有可序列化的型別,並且要求您預先註冊將在程式中使用的類,以獲得最佳效能。

您可以通過在初始化job時,在SparkConf中設定引數"spark.serializer"的值為"org.apache.spark.serializer.KryoSerializer"的方式來使用Kryo(我們將在本書的下一部分討論這個問題)。此配置項配置的serializer,是用於在worker節點之間shuffle資料和將RDDs序列化到磁碟的序列化器。Kryo不是預設序列化器的唯一原因是其要求定製註冊,但是我們建議在任何網路密集型應用程式中嘗試它。從Spark 2.0.0開始,當使用簡單型別、簡單型別陣列或字串型別的RDDs進行shuffle時,我們在內部使用Kryo serializer。

Spark自動包含Kryo序列化器,用於Twitter chill庫的AllScalaRegistrar中包含的許多常用的Scala核心類。

要註冊自己的自定義類與Kryo,使用registerKryoClasses方法:

13.7. 結束語

在本章中,我們討論了許多關於RDDs的更高階的主題。特別值得注意的是關於自定義分割槽的部分,它允許您使用非常特定的函式來佈局資料。在第14章中,我們將討論Spark的另一個低階工具:分散式變數。