Spark中的分區方法詳解
轉自:https://blog.csdn.net/dmy1115143060/article/details/82620715
一、Spark數據分區方式簡要
在Spark中,RDD(Resilient Distributed Dataset)是其最基本的抽象數據集,其中每個RDD是由若幹個Partition組成。在Job運行期間,參與運算的Partition數據分布在多臺機器的內存當中。這裏可將RDD看成一個非常大的數組,其中Partition是數組中的每個元素,並且這些元素分布在多臺機器中。圖一中,RDD1包含了5個Partition,RDD2包含了3個Partition,這些Partition分布在4個節點中。
Spark包含兩種數據分區方式:HashPartitioner(哈希分區)和RangePartitioner(範圍分區)。一般而言,對於初始讀入的數據是不具有任何的數據分區方式的。數據分區方式只作用於<Key,Value>形式的數據。因此,當一個Job包含Shuffle操作類型的算子時,如groupByKey,reduceByKey etc,此時就會使用數據分區方式來對數據進行分區,即確定某一個Key對應的鍵值對數據分配到哪一個Partition中。在Spark Shuffle階段中,共分為Shuffle Write階段和Shuffle Read階段,其中在Shuffle Write階段中,Shuffle Map Task對數據進行處理產生中間數據,然後再根據數據分區方式對中間數據進行分區。最終Shffle Read階段中的Shuffle Read Task會拉取Shuffle Write階段中產生的並已經分好區的中間數據。圖2中描述了Shuffle階段與Partition關系。下面則分別介紹Spark中存在的兩種數據分區方式。
二、HashPartitioner(哈希分區)
1、HashPartitioner原理簡介
HashPartitioner采用哈希的方式對<Key,Value>鍵值對數據進行分區。其數據分區規則為 partitionId = Key.hashCode % numPartitions,其中partitionId代表該Key對應的鍵值對數據應當分配到的Partition標識,Key.hashCode表示該Key的哈希值,numPartitions表示包含的Partition個數。圖3簡單描述了HashPartitioner的數據分區過程。
2、HashPartitioner源碼詳解
-
- HashPartitioner源碼較為簡單,這裏不再進行詳細解釋。
class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") /** * 包含的分區個數 */ def numPartitions: Int = partitions /** * 獲得Key對應的partitionId */ def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions } def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) }
- HashPartitioner源碼較為簡單,這裏不再進行詳細解釋。
三、RangePartitioner(範圍分區)
1、RangePartitioner原理簡介
Spark引入RangePartitioner的目的是為了解決HashPartitioner所帶來的分區傾斜問題,也即分區中包含的數據量不均衡問題。HashPartitioner采用哈希的方式將同一類型的Key分配到同一個Partition中,因此當某一或某幾種類型數據量較多時,就會造成若幹Partition中包含的數據過大問題,而在Job執行過程中,一個Partition對應一個Task,此時就會使得某幾個Task運行過慢。RangePartitioner基於抽樣的思想來對數據進行分區。圖4簡單描述了RangePartitioner的數據分區過程。
2、RangePartitioner源碼詳解
① 確定采樣數據的規模:RangePartitioner默認對生成的子RDD中的每個Partition采集20條數據,樣本數據最大為1e6條。
// 總共需要采集的樣本數據個數,其中partitions代表最終子RDD中包含的Partition個數 val sampleSize = math.min(20.0 * partitions, 1e6)
② 確定父RDD中每個Partition中應當采集的數據量:這裏註意的是,對父RDD中每個Partition采集的數據量會在平均值上乘以3,這裏是為了後繼在進行判斷一個Partition是否發生了傾斜,當一個Partition包含的數據量超過了平均值的三倍,此時會認為該Partition發生了數據傾斜,會對該Partition調用sample算子進行重新采樣。
// 被采樣的RDD中每個partition應該被采集的數據,這裏將平均采集每個partition中數據的3倍 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
③ 調用sketch方法進行數據采樣:sketch方法返回的結果為<采樣RDD的數據量,<partitionId, 分區數據量,分區采樣的數據量>>。在sketch方法中會使用水塘抽樣算法對待采樣的各個分區進行數據采樣,這裏采用水塘抽樣算法是由於實現無法知道每個Partition中包含的數據量,而水塘抽樣算法可以保證在不知道整體的數據量下仍然可以等概率地抽取出每條數據。圖4簡單描述了水塘抽樣過程。
// 使用sketch方法進行數據抽樣 val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) /** * @param rdd 需要采集數據的RDD * @param sampleSizePerPartition 每個partition采集的數據量 * @return <采樣RDD數據總量,<partitionId, 當前分區的數據量,當前分區采集的數據量>> */ def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { val shift = rdd.id val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => val seed = byteswap32(idx ^ (shift << 16)) // 使用水塘抽樣算法進行抽樣,抽樣結果是個二元組<Partition中抽取的樣本量,Partition中包含的數據量> val (sample, n) = SamplingUtils.reservoirSampleAndCount( iter, sampleSizePerPartition, seed) Iterator((idx, n, sample)) }.collect() val numItems = sketched.map(_._2).sum (numItems, sketched) }
④ 數據抽樣完成後,需要對不均衡的Partition重新進行抽樣,默認當Partition中包含的數據量大於平均值的三倍時,該Partition是不均衡的。當采樣完成後,利用樣本容量和RDD中包含的數據總量,可以得到整體的一個數據采樣率fraction。利用此采樣率對不均衡的Partition調用sample算子重新進行抽樣。
// 計算數據采樣率 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) // 存放采樣Key以及采樣權重 val candidates = ArrayBuffer.empty[(K, Float)] // 存放不均衡的Partition val imbalancedPartitions = mutable.Set.empty[Int] //(idx, n, sample)=> (partition id, 當前分區數據個數,當前partition的采樣數據) sketched.foreach { case (idx, n, sample) => // 當一個分區中的數據量大於平均分區數據量的3倍時,認為該分區是傾斜的 if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } // 在三倍之內的認為沒有發生數據傾斜 else { // 每條數據的采樣間隔 = 1/采樣率 = 1/(sample.size/n.toDouble) = n.toDouble/sample.size val weight = (n.toDouble / sample.length).toFloat // 對當前分區中的采樣數據,對每個key形成一個二元組<key, weight> for (key <- sample) { candidates += ((key, weight)) } } } // 對於非均衡的partition,重新采用sample算子進行抽樣 if (imbalancedPartitions.nonEmpty) { val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) }
⑤ 確定各個Partition的Key範圍:使用determineBounds方法來確定每個Partition中包含的Key範圍,先對采樣的Key進行排序,然後計算每個Partition平均包含的Key權重,然後采用平均分配原則來確定各個Partition包含的Key範圍。如當前采樣Key以及權重為:<1, 0.2>, <2, 0.1>, <3, 0.1>, <4, 0.3>, <5, 0.1>, <6, 0.3>,現在將其分配到3個Partition中,則每個Partition的平均權重為:(0.2 + 0.1 + 0.1 + 0.3 + 0.1 + 0.3) / 3 = 0.36。此時Partition1 ~ 3分配的Key以及總權重為<Partition1, {1, 2, 3}, 0.4> <Partition2, {4, 5}, 0.4> <Partition1, {6}, 0.3>。
/** * @param candidates 未按采樣間隔排序的抽樣數據 * @param partitions 最終生成的RDD包含的分區個數 * @return 分區邊界 */ def determineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] // 對樣本按照key進行排序 val ordered = candidates.sortBy(_._1) // 抽取的樣本容量 val numCandidates = ordered.size // 抽取的樣本對應的采樣間隔之和 val sumWeights = ordered.map(_._2.toDouble).sum // 平均每個分區的步長 val step = sumWeights / partitions var cumWeight = 0.0 var target = step // 分區邊界值 val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) cumWeight += weight // 當前的采樣間隔小於target,繼續叠代,也即這些key應該放在同一個partition中 if (cumWeight >= target) { // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray }
⑥ 計算每個Key所在Partition:當分區範圍長度在128以內,使用順序搜索來確定Key所在的Partition,否則使用二分查找算法來確定Key所在的Partition。
/** * 獲得每個Key所在的partitionId */ def getPartition(key: Any): Int = { val k = key.asInstanceOf[K] var partition = 0 // 如果得到的範圍不大於128,則進行順序搜索 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } // 範圍大於128,則進行二分搜索該key所在範圍,即可得到該key所在的partitionId else { // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } if (ascending) { partition } else { rangeBounds.length - partition } }
Spark中的分區方法詳解