spark2原理分析-RDD的Partitioner原理分析
概述
本文介紹了Spark分割槽的實現原理,並對其原始碼進行了分析。
分割槽器(Partitioner)的基本概念
關於Spark分割槽的基本概念和介紹,可以參考我的這篇文章:Spark2.0-RDD分割槽原理分析。這裡我們再回顧一下Spark分割槽的概念:
從概念上講,分割槽器(Partitioner)定義瞭如何分佈記錄,從而定義每個任務將處理哪些記錄。
從實現層面來說,Partitioner是一個帶有以下兩個方法的抽象類:numPartitions和getPartition。該類的定義如下:
abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int }
- numPartitions:定義分割槽後RDD中的分割槽數。
- getPartition:定義從key到分割槽的整數索引的對映,其中應該傳送具有該key的記錄。
Spark提供的分割槽器(Partitioner)物件有兩種實現:HashPartitioner和RangePartitioner(在2.3中有更多的實現)。如果這些都不滿足需要,可以自定義分割槽程式。
HashPartitioner
當RDD沒有Partitioner時,會把HashPartitioner作為預設的Partitioner。它通過計算key的hashcode來對資料進行分割槽。該類的實現程式碼如下:
class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") // 確定的分割槽的數量 def numPartitions: Int = partitions // key到分割槽id的對映,這裡是通過取模的方式實現 def getPartition(key: Any): Int = key match { case null => 0 // 取模運算:hashcode%分割槽數 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } // 重新定義equal函式,若是HashPartitioner且分割槽數相等,返回true override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } // 把HashPartitioner的hashCode設定為分割槽數 override def hashCode: Int = numPartitions }
注意:傳給HashPartitioner(partitions: Int)的引數partitions不能為負。
HashPartitioner具有以下特點:
- HashPartitioner根據key的雜湊值(hashcode)確定子分割槽的索引位置。
- HashPartitioner需要一個分割槽引數,該引數確定輸出RDD中的分割槽數和雜湊函式中使用的分割槽數。若沒有指定該引數,Spark則使用SparkConf中spark.default.parallelism值的值來確定分割槽數。
- 若沒有設定預設並行度值(spark.default.parallelism引數的值),則Spark預設為RDD在其血緣(lineage)中具有的最大分割槽數。
- 在使用HashPartitioner的寬轉換(wide transform)(例如aggregateByKey)中,可選的分割槽數引數用作雜湊分割槽程式的引數。
RangePartitioner
RangePartitioner(範圍分割槽)將其key位於相同範圍內的記錄分配給給定分割槽。排序需要RangePartitioner,因為RangePartitioner能夠確保:通過對給定分割槽內的記錄進行排序,完成整個RDD的排序。
RangePartitioner首先通過取樣確定每個分割槽的範圍邊界:優化跨分割槽的記錄進行均勻分佈。然後,RDD中的每個記錄將被shuffled到其範圍界限內包括該key的分割槽。
高度不平衡的資料(即,某些key的許多值而不是其他key,如果key的分佈不均勻)會使取樣不準確,不均勻的分割槽可能導致下游任務比其他任務更慢,而導致整個任務變慢。
如果與某個關鍵字相關聯的所有記錄的重複key太多而被分配到一個執行器(executor),則範圍分割槽(如雜湊分割槽)可能會導致記憶體錯誤。與排序相關的效能問題通常是由範圍分割槽步驟的這些問題引起的。
使用Spark建立RangePartitioner不僅需要分割槽數量的引數,還需要實際的RDD,用來獲取樣本。 RDD必須是元組,並且key必須具有已定義的順序。
實際上,取樣需要部分評估RDD,從而導致執行圖(graph)中斷。 因此,範圍分割槽實際上既是轉換(transformation)操作又是action(動作)操作。 在範圍分割槽中取樣需要消耗資源,有一定成本,通常,RangePartitioner(範圍分割槽)比HashPartitioner(雜湊分割槽)更耗效能。由於key要求被排序,這樣就無法在元組的所有RDD上進行範圍分割槽。
因此,鍵/值操作(例如聚合)需要使用HashPartitioner作為預設值,這些操作需要每個key都位於同一臺機器上但不以特定方式排序的記錄。但是,也可以使用自定義分割槽程式或範圍分割槽程式執行這些方法。
RangePartitioner的實現:
成員變數 | 說明 |
---|---|
ascending | 定義分割槽資料的排序方式,預設是升序。定義如下:private var ascending: Boolean = true |
samplePointsPerPartitionHint | 每個分割槽具有的樣本數目。 |
partitions | 分割槽數,可以為0。當該引數為0時,表示對空RDD進行排序。 |
ordering | 排序需要用到的工具類,定義了:大於,小於等操作 |
rangeBounds | 儲存分割槽數的,帶上限的陣列 |
numPartitions | 該RDD的分割槽數量。 |
binarySearch | 獲取一個二分查詢的物件。由於key是可排序的,所以使用二分加快查詢效能。 |
getPartition | 獲取分割槽對應的索引id |
自定義分割槽器(Partitioner)
通過繼承Partitioner抽象類,可以定製自己的分割槽器。要定義自己的分割槽器可能需要實現以下函式:
成員名 | 說明 |
---|---|
numPartitions | 分割槽數 |
getPartition | key作為引數(與被分割槽的RDD型別相同),返回表示分割槽索引的整數,該分割槽指定具有該key的記錄所在的位置。 返回的整數必須介於零和分割槽數之間(在numPartitions定義)。 |
equals | 定義兩個分割槽是否相等的方法。 |
hashcode | 僅當重寫了equals方法時才需要此方法。 HashPartitioner的hashcode就是它的分割槽數。 RangePartitioner的hashcode是從範圍邊界派生的雜湊函式。 |
總結
本文介紹了RDD的Partitioner原理,並對其實現進行了簡要分析。