1. 程式人生 > >Spark RDD之Partitioner

Spark RDD之Partitioner

概述

Partitioner是shuffle過程中key重分割槽時的策略,即計算key決定k-v屬於哪個分割槽,Transformation是寬依賴的運算元時,父RDD和子RDD之間會進行shuffle操作,shuffle涉及到網路開銷,由於父RDD和子RDD中的partition是多對多關係,所以容易造成partition中資料分配不均勻,導致資料的傾斜。

Shuffle

在MapReduce框架中,Shuffle是連線Map和和Reduce之間的橋樑,Map的輸出要用到Reduce中必須經過Shuffle這個環節,Spark作為MapReduce框架的一種實現,自然也實現了shuffle的邏輯。

Shuffle描述的是一個過程,表現多對多的依賴關係,是Map和Reduce兩個階段的紐帶,每個Reduce Task都會從Map Task產生的資料裡讀取其中的一片資料,如果Map Task的數目為m,Reduce Task數目為n (暫時認為Task的數目為partition的數目,因為partition的數目在不同的rdd中partition的個數也許會不同),極端情況下可能會觸發m*n個數據複製通道。因為Shuffle通常分為兩部分:Map階段的資料準備和Reduce階段的資料複製。Map階段需根據Reduce階段的Task數量來決定每個Map Task輸出的資料分片數目。

Shuffle的寫操作是將MapTask操作後的資料寫入到磁碟中,Spark中shuffle輸出的ShuffleMapTask會為每個ReduceTask建立對應的bucket,ShuffleMapTask產生的結果會根據設定的partitioner中不同的key值得到對應的bucketId,然後寫入到相應的bucket(同一個bucket儲存的key值應該一致)中去。Shuffle的讀操作由Socket或Netty完成,讀出資料後如何切分partition暫不討論。==BucketId其實也就是下游RDD的partitionId(或許不是Id,但是可以標識分割槽),但是bucketId針對的是單個MapTask,partition針對的是整個RDD,bucket進行聚合後其實就是下游RDD中的partition。每個ShuffleMapTask的輸出結果可能包含所有的ReduceTask需要的資料,所以每個ShuffleMapTask建立bucket的數目是和ReduceTask數目相等的。

Shuffle實現分為HashShuffleManager和SortShuffleManager,也可以自定義。綜上,Partitioner在shuffle階段發揮作用,依據Partitioner的邏輯計算key,得出對應的k-v屬於哪個分割槽。

Partitioner定義

抽象類Partitioner定義了兩個抽象方法numPartitions和getPartition。getPartition方法根據輸入的k-v對的key值返回一個Int型資料。

abstract class Partitioner extends Serializable {
  def numPartitions:
Int def getPartition(key: Any): Int }

該抽象類伴生了一個Partitioner物件如下,主要包含defaultPartitioner函式,該函式定義了Partitioner的預設選擇策略。如果設定了spark.default.parallelism,則使用該值作為預設partitions,否則使用上游RDD中partitions最大的數作為預設partitions。過濾出上游RDD中包含partitioner的RDD,選擇包含有最大partitions並且isEligible的RDD,將該RDD中的partitioner設定為分割槽策略,否則返回一個帶有預設partitions數的HashPartitioner作為Partitioner。Partition個數應該和partition個數最多的上游RDD一致,不然可能會導致OOM異常。

object Partitioner {
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))

    val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
      Some(hasPartitioner.maxBy(_.partitions.length))
    } else {
      None
    }

    val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
      rdd.context.defaultParallelism
    } else {
      rdds.map(_.partitions.length).max
    }

    if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
        defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
      hasMaxPartitioner.get.partitioner.get
    } else {
      new HashPartitioner(defaultNumPartitions)
    }
  }
  
  private def isEligiblePartitioner(
     hasMaxPartitioner: RDD[_],
     rdds: Seq[RDD[_]]): Boolean = {
    val maxPartitions = rdds.map(_.partitions.length).max
    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
  }
}

Partitioner實現類

Partitioner主要有兩個實現類:HashPartitioner和RangePartitioner,上面提到過HashPartitioner是預設的重分割槽方式。

HashPartitioner

numPartitions方法返回傳入的分割槽數,getPartition方法使用key的hashCode值對分割槽數取模得到PartitionId,寫入到對應的bucket中。因為Arrays的hashCodes值並不依賴於arrays的內容,導致hash函式將無法根據key值進行重分割槽,所以HashPartitioner不支援RDD[Array[_]] 或 RDD[(Array[_], _)]。

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

RangePartitioner

RangePartitioner繼承了Partitoner,Partitioner中定義了兩個抽象方法numPartitions和getPartition,前者是重分割槽後的partition數,後者是獲取某個key值重新分割槽後的partitionId,我們著重關注RangePartitioner類中這兩個函式的實現。但我們先對rangeBounds這一函式進行分析,通過分析原始碼可以發現,該函式首先設定了一個樣本大小(預設為每個partition包含20條資料),再對父RDD中的每個partition需要抽取的樣本數進行計算,呼叫工具類中的reservoir sampling演算法對每個partition進行分別抽樣。每個分割槽的記錄數*fraction如果大於該partition中設定的樣本數(這是由於不同的分割槽中包含的資料量不同,資料量較大的分割槽中抽樣數將會大於平均值),則用imbalancedPartitions儲存,並重新抽樣以確保每個分割槽中都有足夠數量的樣本。最後計算權重——分割槽記錄總數/分割槽樣本數,並呼叫determineBounds方法求分割槽分隔符。rangeBounds函式返回了一個數組,該陣列中儲存的是重分割槽的分隔符,簡而言之,該方法的作用是儘可能均衡地將原有的資料進行重分割槽,並返回用於重分割槽的分隔符。例如資料為"a c", “a b”, “b c”, “b d”, “c d”,通過以上函式可能會將b、c、d作為分隔符,所有key為a的將會被重分割槽到同一個partition中(c和d類似)。

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true,
    val samplePointsPerPartitionHint: Int = 20)
  extends Partitioner {

  def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
    this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
  }
  
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      Array.empty
    } else {
      val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        Array.empty
      } else {
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        val candidates = ArrayBuffer.empty[(K, Float)]
        val imbalancedPartitions = mutable.Set.empty[Int]
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx
          } else {
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }
}

接下來關注RangePartitioner類中兩個重要函式——numPartitions和getPartition的實現。首先我們觀察numPartitions,它的返回值為陣列rangeBounds的長度加上1,這很容易理解,如果我們重分割槽的分隔符有n個,那麼我們重分割槽後的partition便是n+1個。getPartition先判斷間隔符的個數,如果小於128則直接遍歷比較key和分隔符得到PartitionId,否則使用二分查詢,並對邊界條件進行了判斷,最後根據構造RangePartitioner時傳入的ascending引數確定是升序或降序返回PartitionId。

  def numPartitions: Int = rangeBounds.length + 1

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      partition = binarySearch(rangeBounds, k)
      if (partition < 0) {
        partition = -partition-1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

總結RangePartitioner的過程為以下幾步:

  • 使用reservoir Sample方法對每個Partition進行分別抽樣
  • 對資料量大(大於sampleSizePerPartition)的分割槽進行重新抽樣
  • 由權重資訊計算出分割槽分隔符rangeBounds
  • 由rangeBounds計算分割槽數和key的所屬分割槽

總結

綜上所述,Partitioner主要的作用是在shuffle過程中對資料的partition進行重新分割槽,其主要實現的函式為:

  1. 獲得重新分割槽的分割槽個數;
  2. 針對某個k-v對根據其中的key,將它按特定的方法重新分割槽。

Partitioner的具體實現類HashPartitioner和RangePartitioner對以上兩個方法進行了重寫。HashPartitioner主要是通過對原partition中資料的key值進行hash後,根據key的hash值將放入bucket中,再將不同partition中的bucket合併實現重新分割槽;RangePartitioner是先根據所有partition中資料的分佈情況,儘可能均勻地構造出重分割槽的分隔符,再將資料的key值根據分隔符進行重新分割槽。HashPartitioner是大部分transformation的預設實現,sortBy、sortByKey使用RangePartitioner實現,也可以自定義Partitioner。以下是自定義Partitioner的一個例子,該Partitioner以key的長度進行分割槽:

class CustomPartitioner(partitions: Int) extends Partitioner {

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {
    val k = key.asInstanceOf[String]
    return k.length() & (partitions -1)
  }
}

具體的呼叫如下:

    val data = sc.parallelize(List("a c", "a b", "bb c", "bb d", "c d"), 2)
    data.flatMap(_.split(" ")).map((_, 1))
      .partitionBy(new CustomPartitioner(2)).reduceByKey(_ + _)
        .collect()

我們可以看到,Partitioner並不作用於reduceByKey而是作用於map,是因為它是對shuffle之前的partition進行重分割槽策略的定義,在reduceByKey過程中,會呼叫當前RDD的dep(父RDD)中定義的Partitioner進行重分割槽。