1. 程式人生 > >scala-zipWithIndex、zipWithUniqueId 函式用法解析

scala-zipWithIndex、zipWithUniqueId 函式用法解析

1. 是什麼

顧名思義,zipWithIndex:通過主鍵打包,ZipWithUniqueId:通過唯一主鍵打包。二者的主要作用

1. def zipWithIndex(): RDD[(T, Long)]

該函式將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。

2. def zipWithUniqueId(): RDD[(T, Long)]

該函式將RDD中元素和一個唯一ID組合成鍵/值對,該唯一ID生成演算法如下:
每個分割槽中第一個元素的唯一ID值為:該分割槽索引號,
每個分割槽中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分割槽數)
該函式將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。

2. 怎麼用

// 1. zipWithIndex
scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd2.zipWithIndex().collect
res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

// 2. zipWithUniqueId
scala> var rdd1 = sc.makeRDD
(Seq("A","B","C","D","E","F"),2) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21 //rdd1有兩個分割槽, scala> rdd1.zipWithUniqueId().collect res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5)) //總分割槽數為2 //第一個分割槽第一個元素ID為0,第二個分割槽第一個元素ID為1 //第一個分割槽第二個元素ID為0
+2=2,第一個分割槽第三個元素ID為2+2=4 //第二個分割槽第二個元素ID為1+2=3,第二個分割槽第三個元素ID為3+2=5

3. 深入原始碼

1.zipWithIndex
  /**
   * Zips this RDD with its element indices. The ordering is first based on the partition index
   * and then the ordering of items within each partition. So the first item in the first
   * partition gets index 0, and the last item in the last partition receives the largest index.
   *
   * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
   * This method needs to trigger a spark job when this RDD contains more than one partitions.
   *
   * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
   * elements in a partition. The index assigned to each element is therefore not guaranteed,
   * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
   * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
   */
  def zipWithIndex(): RDD[(T, Long)] = withScope {
    new ZippedWithIndexRDD(this)
  }

正如文件所註釋,The ordering is first based on the partition index,the last item in the last partition receives the largest index,ID號跟著分割槽走。方法new了ZippedWithIndexRDD物件,繼續點選

/**
 * Represents an RDD zipped with its element indices. The ordering is first based on the partition
 * index and then the ordering of items within each partition. So the first item in the first
 * partition gets index 0, and the last item in the last partition receives the largest index.
 *
 * @param prev parent RDD
 * @tparam T parent RDD item type
 */
private[spark]
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {

  /** The start index of each partition. */
  @transient private val startIndices: Array[Long] = {
    val n = prev.partitions.length
    if (n == 0) {
      Array.empty
    } else if (n == 1) {
      Array(0L)
    } else {
      prev.context.runJob(
        prev,
        Utils.getIteratorSize _,
        0 until n - 1 // do not need to count the last partition
      ).scanLeft(0L)(_ + _)
    }
  }

  override def getPartitions: Array[Partition] = {
    firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
  }

  override def getPreferredLocations(split: Partition): Seq[String] =
    firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)

  override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
    val parentIter = firstParent[T].iterator(split.prev, context)
    Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
  }
}

哦,果然是根據分割槽去Index

2.zipWithUniqueId
  /**
   * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
   * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
   * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
   *
   * @note Some RDDs, such as those returned by groupBy(), do not guarantee order of
   * elements in a partition. The unique ID assigned to each element is therefore not guaranteed,
   * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
   * the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
   */
  def zipWithUniqueId(): RDD[(T, Long)] = withScope {
    val n = this.partitions.length.toLong
    this.mapPartitionsWithIndex { case (k, iter) =>
      Utils.getIteratorZipWithIndex(iter, 0L).map { case (item, i) =>
        (item, i * n + k)
      }
    }
  }

原始碼doc註釋中已經定義好 index的規則:will get ids k, n+k, 2*n+k, …, where n is the number of partitions.
更加值得注意一句話是:won’t trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]],不會觸發spark job任務。再回去看下ZipWithIndex的原始碼:
prev.context.runJob, 哦,原來這個方法還啟動了 spark job任務,我只想拍個序給個ID,非要起個任務?

4. 總結

2個方法都有對RDD中的元素進行ID標號的功能,但是有以下區別:

  1. 前者依賴分割槽,可能會造成ID相同的情況。而後者根據演算法“k, n+k, 2*n+k”生成Long型別的ID,所以一定不會重複,這也是他被命名為UniqueId的原因吧
  2. 後者效率更高,因為前者會啟動runJob的任務
  3. 2者的共性,在Doc上也有註釋:Some RDDs, such as those returned by groupBy(), do not guarantee order of elements in a partition