1. 程式人生 > >Spark的WordCount到底產生了多少個RDD

Spark的WordCount到底產生了多少個RDD

在Spark的wordcount中,一共會產生幾個RDD?
很多人在面試的過程會被問到Spark的WordCount中一共會產生多少個RDD呢?
答案是六個
一個HadoopRDD
四個MapPartitionsRDD
一個ShuffleRDD
下面根據原始碼進行分析,

val  lines : RDD[String] = sc.textFile("hdfs://hadoop01/wc/input")
val words : RDD[String] = lines.flatMap(_.split(" "))
val wordAndOne : RDD[(String,Int)] = words.map((_,1))
val reduced = wordAndOne.reduceByKey(_+_)
reduced.saveAsTextFile("hdfs://hadoop/wc/output")

1、首先sparkContext呼叫 textFile()方法

val  lines : RDD[String] = sc.textFile("hdfs://hadoop01/wc/input")

通過下面的原始碼,可以看到 在這個方法中 先呼叫了一個hadoopFile方法再呼叫map方法
在這裡插入圖片描述
點進hadoopFile中,可以看到這個方法的返回值是RDD,可以看到這裡產生了一個HadoopRDD
在這裡插入圖片描述
hadoopFile方法返回的是個RDD(HadoopRDD),在對這個RDD呼叫map方法,點到map方法中可以看到 ,map方法中產生了一個MapPartitionsRDD
在這裡插入圖片描述

2、接下來呼叫flatMap()方法

val words : RDD[String] = lines.flatMap(_.split(" "))

點到這個方法裡,會發現這個方法也產生了一個MapPartitionsRDD

在這裡插入圖片描述
3、然後往下執行程式碼,執行到了map方法

val wordAndOne : RDD[(String,Int)] = words.map((_,1))

點進map的方法,可以看到產生了一個MapPartitionsRDD

在這裡插入圖片描述

4、在向下執行,就會呼叫reduceByKey方法

val reduced = wordAndOne.reduceByKey(_+_)

這裡要注意啦,reduceByKey雖然是一個rdd呼叫的,但reduceByKey這個方法不是RDD中的方法

,我們可以在RDD中找到如下的一個隱式轉換,當我們去呼叫reduceByKey方法時,會發生隱式轉換,隱式的RDD轉化成了PairRDDFunctions這個類,reduceByKey是PairRDDFunctions的方法
在這裡插入圖片描述
接下來點進reduceByKey方法,再點reduceByKey(defaultPartitioner(self), func)進去
在這裡插入圖片描述
點到combineByKeyWithClassTag裡面
在這裡插入圖片描述
點到combineByKeyWithClassTag中會看見,這裡面會生成一個ShuffleRDD
在這裡插入圖片描述

5、最後呼叫saveAsTextFile,在這裡面有呼叫了一個mapPartitions方法
在這裡插入圖片描述
在mapPartitions方法中會產生一個MapPartitionsRDD
在這裡插入圖片描述
所以綜合上述分析,我們可以看見在spark的一個標準的wordcount中一共會產生6個RDD,textFile() 會產生一個HadoopRDD和一個MapPerPartitionRDD,flatMap()方法會產生一個MapPartitionsRDD,map() 方法會產生一個MapPartitionsRDD ,reduceByKey()方法會產生一個ShuffleRD,saveAsTextFile會產生一個MapPartitionsRDD,所以一共會產生6個RDD。