srotByKey為什麼會觸發job
阿新 • • 發佈:2018-11-29
大家都只知道srotByKey是一個transformation運算元,而transformation型別的運算元是不觸發Job的,但是有心的人會注意到,在我們呼叫sortByKey這個運算元時 ,可以從UI介面看到,sortByKey竟然會觸發Job
其實sortByKey 在執行時會對分割槽中的資料進行取樣,把取樣的資料再進行收集(collect),那麼此時就會觸發一個Job,具體的原因可以往下看原始碼
以下是原始碼分析
如果又哪裡寫錯了歡迎糾正
參考文章
https://blog.csdn.net/u014393917/article/details/50602047
可以先大概的跟一邊程式碼
(1)呼叫sortByKey時,會new一個RangePartitioner類
(2)點進RangePartitioner中,會初始化一個rangeBounds的陣列
再裡面呼叫了sketch方法(在類中,在方法外,相當於是構造方法中的)
(3)點進sketch方法中,這裡呼叫了一個collect方法,點進這個collect方法中,可以發現是rdd的方法,會呼叫runJob
上面是大概看了一遍,有興趣的可以再向下看原始碼的分析
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
// new 了一個RangePartitioner, 傳入分割槽數,
//呼叫方法的rdd, 排序規則,
//點入RangePartitiner方法中
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
這裡直接取了一部分程式碼,是在new RangePartitioner 初始化時會執行的
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
private var ordering = implicitly[Ordering[K]]
// An array of upper bounds for the first (partitions - 1) partitions
//初始化一個rangeBounds, 陣列型別
private var rangeBounds: Array[K] = {
if (partitions <= 1) { //如果分割槽的數小於等於1, 則建立一個空陣列
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
//這是我們需要大致平衡輸出分割槽的樣本量,上限為1M。
//一個大約的分割槽的樣本量,最多不超過1e6(1000000)個大約1M,
// 預設是分割槽個數的20倍.如果這個分割槽太多時,只取1e6的個數.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
//假設輸入分割槽大體上是平衡的,並且有點過取樣。
//對樣本數* 3/ 分割槽數 ,去天花板,再取證,得到的是每個分割槽取樣的數
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
//在每個分割槽中去 sampleSizePerpartition 個樣, 注意sketch方法中使用了collect方法,會觸發一個job
//這裡可以直接看下一部分程式碼,看sketch方法,再下面
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
/*這裡返回的sketched方法是一個數組, 陣列的長度是rdd的partitions的個數,
陣列中每一個元素是一個Iterator(partitionid,這個partition中總的資料條數,Array[key](
長度是樣本個數,或者一個小於樣本個數的值(這種情況表示partition的資料不夠樣本個數))),
*/
if (numItems == 0L) { //如果獲取的樣本數==0
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
//如果一個分割槽包含的專案數量遠遠超過平均數量,那麼我們將重新對其進行取樣
//sampleSize是上面定義的樣本數量 通過取樣的個數除上總的資料的條數,得到一個分數值.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
//這個candidates中儲存有用於計算排序的key的候選人資訊
val candidates = ArrayBuffer.empty[(K, Float)]
//儲存超過平均值太多的分割槽
val imbalancedPartitions = mutable.Set.empty[Int]
//對呼叫sktched方法後返回的樣本進行foreach
sketched.foreach { case (idx, n, sample) =>
//分值 * 每個分割槽樣本個數 如果 大於 設定的sampleSizePerPartition 值
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx //新增到imbalancedPartitions,需要重新取樣
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {//如果需要重新抽樣的集合不為空
// Re-sample imbalanced partitions with the desired sampling probability.
//據需要重新進行取樣的partition生成一個PartitionPruningRDD例項.
// 這個例項中只計算需要進行重新取樣的partition.傳入引數中的imbalancedPartitions.contains用於過濾partition
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
//重新抽樣,這裡的seed類似於種子取隨機數的感覺
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, partitions)
}
}
sketch方法:
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
//使用mapPartitionsWtithIndex方法,獲取分割槽號和每個元素
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => // idx ,分割槽號, iter每個分割槽中的資料是一個迭代器
//
val seed = byteswap32(idx ^ (shift << 16))
//reservoirSampleAndCount 還返回輸入大小的儲存庫抽樣實現, 返回一個樣本(陣列型別),和輸入的長度
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed) //意思大概就是從每個分割槽中取一些樣本
Iterator((idx, n, sample)) //迭代器中是 分割槽號,每個分割槽的樣本數,樣本(陣列型別)
}.collect() //先收集每個分割槽的樣本的,collect方法會產生一個job ,這裡是把取到的樣本後得到的rdd進行收集,點進collect方法會發現有一個runJob
val numItems = sketched.map(_._2).sum // 這個numItems應該是總共的樣本數
(numItems, sketched) //最後把樣本數,一個收集好的樣本Array[key]型別返回
}