1. 程式人生 > >spark原始碼解讀4之SortByKey

spark原始碼解讀4之SortByKey

spark原始碼解讀系列環境:spark-2.0.1 (20161103github下載版)

1.理解

1.1 需求

使用spark的時候會經常使用sortBykey,比如wordCount後需要排序,可以使用sortBy,也可以先map然後再sortByKey,soerBy也是呼叫SortByKey

1.2 原始碼

SortByKey:org.apache.spark.rdd.OrderedRDDFunctions#sortByKey

  /**
   * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
   * `collect` or `save` on the resulting RDD will return or output an ordered list of records
   * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
   * order of the keys).
   */
  // TODO: this currently doesn't work on P other than Tuple2!
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }

sortBy:org.apache.spark.rdd.RDD#sortBy

  /**
   * Return this RDD sorted by the given key function.
   */
  def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
    this.keyBy[K](f)
        .sortByKey(ascending, numPartitions)
        .values
  }

1.3 分析

1.3.1 sortByKey之資料partitioner類RangePartitioner

sortByKey使用了RangePartitioner,這個在前面的博文“spark原始碼解讀1之Partitioner”中已經有初步分析。RangePartitioner能很大程度上避免hash出現數據的資料分佈不均勻的情況

RangePartitioner會在determineBounds對邊界進行排序,用的是scala.collection.SeqLike#sorted ,呼叫的是java.util.Arrays#sort(T[], java.util.Comparator

1.3.2 ShuffleRDD

new ShuffledRDD並且返回,即為排序好的soetByKey的結果

1.3.2.1 partitions_屬性
new ShuffledRDD的資料儲存在partitions_屬性中,這個繼承自父類RDD,final方法partitions會給partitions_賦值,呼叫的是getPartitions方法,然後zipWithIndex

partitions原始碼:

  final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
        partitions_.zipWithIndex.foreach { case (partition, index) =>
          require(partition.index == index,
            s"partitions($index).partition == ${partition.index}, but it should equal $index")
        }
      }
      partitions_
    }
  }

1.3.2.2 getPartitions方法

getPartitions在ShuffledRDD重寫了:

 override def getPartitions: Array[Partition] = {
    Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
  }

part.numPartitions實際為1.3.1中傳入的RangePartitioner的屬性:

def numPartitions: Int = rangeBounds.length + 1

而rangeBounds則是用水塘抽樣演算法(Reservoir Sampling)建立的邊界範圍,

 1 = 632826677
 0 = -841013005
rangeBounds = {int[2]@5390} 

getPartitions後是確定RDD的partition數量和index

只有當進行取數操作時,比如top(k)然後顯示,資料才會劃分到partitions_的每個values下

debug資料copy:

"WrappedArray$ofRef" size = 333
values = {[email protected]} "WrappedArray$ofRef" size = 333
 0 = {[email protected]} "(-1813557161,-1212512531)"
 1 = {[email protected]} "(-1144323740,933490971)"
 2 = {[email protected]} "(-12508600,-329995331)"
 3 = {[email protected]} "(-1570574142,-743284380)"
 5 = {[email protected]} "(-532362478,1106605038)"
 4 = {[email protected]} "(249668146,-1487774671)"
 6 = {[email protected]} "(-146176592,666226908)"

本地debug的程式碼是:

  test("large array") {
    val rand = new scala.util.Random()
    val pairArr = Array.fill(1000) {
      (rand.nextInt(), rand.nextInt())
    }
    val pairs = sc.parallelize(pairArr, 3)
    val sorted = pairs.sortByKey()
    sorted.count()
    sorted.top(3).foreach(println)
    assert(sorted.partitions.size === 3)
    assert(sorted.collect() === pairArr.sortBy(_._1))
  }

ShuffledRDD的partitions_對應的是三個ParallelCollectionPartition,這個是RDD的依賴關係得到的,ParallelCollectionPartition類重寫了getPartitions方法,所以

  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

裡面partition 的排序方法沒找到,不知道邏輯,需要後續去學習RDD和DAG、Stage等程式碼。

2.程式碼:

sortByKey使用:org.apache.spark.rdd.SortingSuite

  test("sortByKey") {
    val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
    assert(pairs.sortByKey().collect() === Array((0, 0), (1, 0), (2, 0), (3, 0)))
  }

sortBy:org.apache.spark.rdd.RDDSuite

 test("sortBy") {
    val data = sc.parallelize(Seq("5|50|A", "4|60|C", "6|40|B"))

    val col1 = Array("4|60|C", "5|50|A", "6|40|B")
    val col2 = Array("6|40|B", "5|50|A", "4|60|C")
    val col3 = Array("5|50|A", "6|40|B", "4|60|C")

    assert(data.sortBy(_.split("\|")(0)).collect() === col1)
    assert(data.sortBy(_.split("\|")(1)).collect() === col2)
    assert(data.sortBy(_.split("\|")(2)).collect() === col3)
  }

3.結果:

3.1 TimSort有待學習
3.2 RangePartitioner只是確定numPartitions和getPartition(key: Any),partition內部如何排序沒有看到

參考

【1】http://spark.apache.org/
【2】http://spark.apache.org/docs/1.5.2/programming-guide.html
【3】https://github.com/xubo245/SparkLearning
【4】book:《深入理解spark核心思想與原始碼分析》
【5】book:《spark核心原始碼分析和開發實戰》
【6】http://blog.csdn.net/u014393917/article/details/50602047