1. 程式人生 > >Spak—— sparkCore原始碼解析之RangePartitioner原始碼

Spak—— sparkCore原始碼解析之RangePartitioner原始碼

  

分割槽過程概覽

RangePartitioner分割槽執行原理:

計算總體的資料抽樣大小sampleSize,計算規則是:至少每個分割槽抽取20個數據或者最多1M的資料量。

  1. 根據sampleSize和分割槽數量計算每個分割槽的資料抽樣樣本數量最大值sampleSizePrePartition
  2. 根據以上兩個值進行水塘抽樣,返回RDD的總資料量,分割槽ID和每個分割槽的取樣資料。
  3. 計算出資料量較大的分割槽通過RDD.sample進行重新抽樣。
  4. 通過抽樣陣列 candidates: ArrayBuffer[(K, wiegth)]計算出分割槽邊界的陣列BoundsArray
  5. 在取資料時,如果分割槽數小於128則直接獲取,如果大於128則通過二分法,獲取當前Key屬於那個區間,返回對應的BoundsArray下標即為partitionsID

RangePartitioner

class RangePartitioner(partitions,rdd) {
// 1. 計算樣本大小
 val sampleSize = math.min(20.0 * partitions, 1e6)
// 2. 計算樣本最大值
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
// 3. 進行抽樣,返回總資料量,分割槽ID和樣本資料
val (numItems, sketched) = RangePartitioner.sketch(
    rdd.map(_._1), sampleSizePerPartition)
// 4. 是否需要二次取樣
val imbalancedPartitions = 	mutable.Set.empty[Int]
//  5. 儲存樣本資料的集合buffer:包含資料和權重
val candidates = ArrayBuffer.empty[(K, Float)]
 if (fraction * n > sampleSizePerPartition) {
	// 記錄需要重新取樣的RDD的ID
	imbalancedPartitions += idx 
}else{
// 5. 計算樣本權重
	val weight = (
	  // 取樣資料的佔比
		n.toDouble / sample.length).toFloat 
            for (key <- sample) {
			// 記錄取樣資料key和權重
              candidates += ((key, weight))
            }
	}
// 6. 對於資料分佈不均衡的RDD分割槽,重新資料抽樣
if (imbalancedPartitions.nonEmpty) {
	// 利用rdd的sample抽樣函式API進行資料抽樣
    val reSampled = imbalanced.sample(
		withReplacement = false, fraction, seed).collect()
}
// 7. 生成邊界陣列
RangePartitioner.determineBounds(candidates, partitions)
}

rangeBounds

 // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      //  This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      //  給定總的資料抽樣大小,最多1M的資料量(10^6),最少20倍的RDD分割槽數量,也就是每個RDD分割槽至少抽取20條資料
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      // RDD各分割槽中的資料量可能會出現傾斜的情況,乘於3的目的就是保證資料量小的分割槽能夠取樣到足夠的資料,而對於資料量大的分割槽會進行第二次取樣
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      // 從rdd中抽樣得到的資料,返回值:(總資料量, Array[分割槽id,當前分割槽的資料量,當前分割槽抽取的資料])
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        // 如果總的資料量為0(RDD為空),那麼直接返回一個空的陣列
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 計算是否需要重新取樣:如果分割槽包含的資料量遠遠大於平均取樣的資料量則重新進行分割槽
        // 樣本佔比:計算總樣本數量和總記錄數的佔比,佔比最大為1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        //  儲存樣本資料的集合buffer:包含資料和權重
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 儲存資料分佈不均衡的分割槽id(資料量超過fraction比率的分割槽)
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 遍歷抽樣資料
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            //  樣本數量佔比乘以當前RDD的總行數大於預設的每個RDD最大抽取數量,說明這個RDD的資料量比較大,需要取樣更多的資料:eg: 0.2*100=20<60;0.2*20000=2000>60
            // 如果樣本佔比乘以當前分割槽中的資料量大於之前計算的每個分割槽的抽象資料大小,那麼表示當前分割槽抽取的資料太少了,該分割槽資料分佈不均衡,需要重新抽取
            imbalancedPartitions += idx // 記錄需要重新取樣的RDD的ID
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat // 取樣資料的佔比,RDD越大,權重越大
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        // 對於資料分佈不均衡的RDD分割槽,重新進行資料抽樣
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          // 利用rdd的sample抽樣函式API進行資料抽樣
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        // 將最終的抽樣資料計算出分割槽邊界陣列返回,邊界數組裡面存放的是RDD裡面資料的key值,
        // 比如array[0,10,20,30..]表明:key值在0到10的在第一個RDD,key值在10到20的在第二個RDD
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }

sketch

  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      /*水塘抽樣:返回抽樣資料和RDD的總資料量*/
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    // 計算所有RDD的總資料量
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

determineBounds

 /** 依據候選中的權重劃分分割槽,權重值可以理解為該Key值所代表的元素數目 返回一個數組,長度為partitions - 1,第i個元素作為第i個分割槽內元素key值的上界
   *  Determines the bounds for range partitioning from candidates with weights indicating how many
   *  items each represents. Usually this is 1 over the probability used to sample this candidate.
   *
   * @param candidates unordered candidates with weights 抽樣資料,包含了每個樣本的權重
   * @param partitions number of partitions 分割槽數量
   * @return selected bounds
   */
  def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    //依據Key進行排序,升序,所以按區間分割槽後,各個分割槽是有序的
    val ordered = candidates.sortBy(_._1)
    // 取樣資料總數
    val numCandidates = ordered.size
    // //計算出權重和
    val sumWeights = ordered.map(_._2.toDouble).sum
    // 計算出步長:權重總數相當於預計資料總量,除以分割槽數就是每個分割槽的數量,得到的值即是按區間分割的區間步長
    val step = sumWeights / partitions
    var cumWeight = 0.0
    // 初始化target值為區間大小
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    // 遍歷取樣資料
    while ((i < numCandidates) && (j < partitions - 1)) {
      val (key, weight) = ordered(i)
      // 計算取樣資料在當前RDD中的位置,如果大於區間大小則:記錄邊界KEY值
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values. // 相同key值處於相同的Partition中,key值不同可以進行分割
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
          bounds += key //記錄邊界
          target += step
          j += 1
          previousBound = Some(key)
        }
      }
      i += 1
    }
    bounds.toArray
  }

getPartition

// 根據RDD的key值返回對應的分割槽id。從0開始
  def getPartition(key: Any): Int = {
    // 強制轉換key型別為RDD中原本的資料型別
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      // 如果分割槽資料小於等於128個,那麼直接本地迴圈尋找當前k所屬的分割槽下標
      // ordering.gt(x,y):如果x>y,則返回true
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // 如果分割槽數量大於128個,那麼使用二分查詢方法尋找對應k所屬的下標;
      // 但是如果k在rangeBounds中沒有出現,實質上返回的是一個負數(範圍)或者是一個超過rangeBounds大小的數(最後一個分割槽,比所有資料都大)
      // Determine which binary search method to use only once.
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    //  根據資料排序是升序還是降序進行資料的排列,預設為升序
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }