阿新 • • 發佈:2018-11-29
大家都只知道srotByKey是一個transformation運算元,而transformation型別的運算元是不觸發Job的,但是有心的人會注意到,在我們呼叫sortByKey這個運算元時 ,可以從UI介面看到,sortByKey竟然會觸發Job
其實sortByKey 在執行時會對分割槽中的資料進行取樣,把取樣的資料再進行收集(collect),那麼此時就會觸發一個Job,具體的原因可以往下看原始碼
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
// new 了一個RangePartitioner, 傳入分割槽數,
//呼叫方法的rdd, 排序規則,
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
這裡直接取了一部分程式碼,是在new RangePartitioner 初始化時會執行的
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
//初始化一個rangeBounds, 陣列型別
private var rangeBounds: Array[K] = {
if (partitions <= 1) { //如果分割槽的數小於等於1, 則建立一個空陣列
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// 預設是分割槽個數的20倍.如果這個分割槽太多時,只取1e6的個數.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
//對樣本數* 3/ 分割槽數 ,去天花板,再取證,得到的是每個分割槽取樣的數
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
//在每個分割槽中去 sampleSizePerpartition 個樣, 注意sketch方法中使用了collect方法,會觸發一個job
val (numItems, sketched) = RangePartitioner.sketch(, sampleSizePerPartition)
/*這裡返回的sketched方法是一個數組, 陣列的長度是rdd的partitions的個數,
if (numItems == 0L) { //如果獲取的樣本數==0
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
//sampleSize是上面定義的樣本數量 通過取樣的個數除上總的資料的條數,得到一個分數值.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
//分值 * 每個分割槽樣本個數 如果 大於 設定的sampleSizePerPartition 值
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx //新增到imbalancedPartitions,需要重新取樣
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
if (imbalancedPartitions.nonEmpty) {//如果需要重新抽樣的集合不為空
// Re-sample imbalanced partitions with the desired sampling probability.
// 這個例項中只計算需要進行重新取樣的partition.傳入引數中的imbalancedPartitions.contains用於過濾partition
val imbalanced = new PartitionPruningRDD(, imbalancedPartitions.contains)
val seed = byteswap32( - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= => (x, weight))
RangePartitioner.determineBounds(candidates, partitions)
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift =
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => // idx ,分割槽號, iter每個分割槽中的資料是一個迭代器
val seed = byteswap32(idx ^ (shift << 16))
//reservoirSampleAndCount 還返回輸入大小的儲存庫抽樣實現, 返回一個樣本(陣列型別),和輸入的長度
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed) //意思大概就是從每個分割槽中取一些樣本
Iterator((idx, n, sample)) //迭代器中是 分割槽號,每個分割槽的樣本數,樣本(陣列型別)
}.collect() //先收集每個分割槽的樣本的,collect方法會產生一個job ,這裡是把取到的樣本後得到的rdd進行收集,點進collect方法會發現有一個runJob
val numItems = // 這個numItems應該是總共的樣本數
(numItems, sketched) //最後把樣本數,一個收集好的樣本Array[key]型別返回