1. 程式人生 > >srotByKey為什麼會觸發job

srotByKey為什麼會觸發job

大家都只知道srotByKey是一個transformation運算元,而transformation型別的運算元是不觸發Job的,但是有心的人會注意到,在我們呼叫sortByKey這個運算元時 ,可以從UI介面看到,sortByKey竟然會觸發Job
其實sortByKey 在執行時會對分割槽中的資料進行取樣,把取樣的資料再進行收集(collect),那麼此時就會觸發一個Job,具體的原因可以往下看原始碼

以下是原始碼分析
如果又哪裡寫錯了歡迎糾正
參考文章
https://blog.csdn.net/u014393917/article/details/50602047

可以先大概的跟一邊程式碼
(1)呼叫sortByKey時,會new一個RangePartitioner類
在這裡插入圖片描述


(2)點進RangePartitioner中,會初始化一個rangeBounds的陣列
再裡面呼叫了sketch方法(在類中,在方法外,相當於是構造方法中的)
在這裡插入圖片描述
(3)點進sketch方法中,這裡呼叫了一個collect方法,點進這個collect方法中,可以發現是rdd的方法,會呼叫runJob
在這裡插入圖片描述
上面是大概看了一遍,有興趣的可以再向下看原始碼的分析

  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
// new 了一個RangePartitioner, 傳入分割槽數, //呼叫方法的rdd, 排序規則, //點入RangePartitiner方法中 val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) }

這裡直接取了一部分程式碼,是在new RangePartitioner 初始化時會執行的

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  //初始化一個rangeBounds, 陣列型別
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {  //如果分割槽的數小於等於1, 則建立一個空陣列
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      //這是我們需要大致平衡輸出分割槽的樣本量,上限為1M。
      //一個大約的分割槽的樣本量,最多不超過1e6(1000000)個大約1M,
      // 預設是分割槽個數的20倍.如果這個分割槽太多時,只取1e6的個數.
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      //假設輸入分割槽大體上是平衡的,並且有點過取樣。
      //對樣本數* 3/ 分割槽數 ,去天花板,再取證,得到的是每個分割槽取樣的數
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      //在每個分割槽中去 sampleSizePerpartition 個樣, 注意sketch方法中使用了collect方法,會觸發一個job
      //這裡可以直接看下一部分程式碼,看sketch方法,再下面
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      /*這裡返回的sketched方法是一個數組, 陣列的長度是rdd的partitions的個數,
     陣列中每一個元素是一個Iterator(partitionid,這個partition中總的資料條數,Array[key](
     長度是樣本個數,或者一個小於樣本個數的值(這種情況表示partition的資料不夠樣本個數))),
*/

      if (numItems == 0L) {  //如果獲取的樣本數==0
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        //如果一個分割槽包含的專案數量遠遠超過平均數量,那麼我們將重新對其進行取樣
        //sampleSize是上面定義的樣本數量 通過取樣的個數除上總的資料的條數,得到一個分數值.
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        //這個candidates中儲存有用於計算排序的key的候選人資訊
        val candidates = ArrayBuffer.empty[(K, Float)]
        //儲存超過平均值太多的分割槽
        val imbalancedPartitions = mutable.Set.empty[Int]
        //對呼叫sktched方法後返回的樣本進行foreach
        sketched.foreach { case (idx, n, sample) =>
          //分值 * 每個分割槽樣本個數 如果 大於 設定的sampleSizePerPartition 值
          if (fraction * n > sampleSizePerPartition) {
            imbalancedPartitions += idx //新增到imbalancedPartitions,需要重新取樣
          } else {
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }
        if (imbalancedPartitions.nonEmpty) {//如果需要重新抽樣的集合不為空
          // Re-sample imbalanced partitions with the desired sampling probability.
          //據需要重新進行取樣的partition生成一個PartitionPruningRDD例項.
          // 這個例項中只計算需要進行重新取樣的partition.傳入引數中的imbalancedPartitions.contains用於過濾partition
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          val seed = byteswap32(-rdd.id - 1)
          //重新抽樣,這裡的seed類似於種子取隨機數的感覺
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()

          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }

sketch方法:

  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    //使用mapPartitionsWtithIndex方法,獲取分割槽號和每個元素
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => // idx ,分割槽號, iter每個分割槽中的資料是一個迭代器
      //
      val seed = byteswap32(idx ^ (shift << 16))

      //reservoirSampleAndCount 還返回輸入大小的儲存庫抽樣實現, 返回一個樣本(陣列型別),和輸入的長度
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(  
        iter, sampleSizePerPartition, seed)  //意思大概就是從每個分割槽中取一些樣本 
      Iterator((idx, n, sample))    //迭代器中是 分割槽號,每個分割槽的樣本數,樣本(陣列型別)
    }.collect()  //先收集每個分割槽的樣本的,collect方法會產生一個job ,這裡是把取到的樣本後得到的rdd進行收集,點進collect方法會發現有一個runJob
    val numItems = sketched.map(_._2).sum  // 這個numItems應該是總共的樣本數
    (numItems, sketched) //最後把樣本數,一個收集好的樣本Array[key]型別返回
  }