Spark的WordCount到底產生了多少個RDD
在Spark的wordcount中,一共會產生幾個RDD?
很多人在面試的過程會被問到Spark的WordCount中一共會產生多少個RDD呢?
答案是六個
一個HadoopRDD
四個MapPartitionsRDD
一個ShuffleRDD
下面根據原始碼進行分析,
val lines : RDD[String] = sc.textFile("hdfs://hadoop01/wc/input") val words : RDD[String] = lines.flatMap(_.split(" ")) val wordAndOne : RDD[(String,Int)] = words.map((_,1)) val reduced = wordAndOne.reduceByKey(_+_) reduced.saveAsTextFile("hdfs://hadoop/wc/output")
1、首先sparkContext呼叫 textFile()方法
val lines : RDD[String] = sc.textFile("hdfs://hadoop01/wc/input")
通過下面的原始碼,可以看到 在這個方法中 先呼叫了一個hadoopFile方法再呼叫map方法
點進hadoopFile中,可以看到這個方法的返回值是RDD,可以看到這裡產生了一個HadoopRDD
hadoopFile方法返回的是個RDD(HadoopRDD),在對這個RDD呼叫map方法,點到map方法中可以看到 ,map方法中產生了一個MapPartitionsRDD
2、接下來呼叫flatMap()方法
val words : RDD[String] = lines.flatMap(_.split(" "))
點到這個方法裡,會發現這個方法也產生了一個MapPartitionsRDD
3、然後往下執行程式碼,執行到了map方法
val wordAndOne : RDD[(String,Int)] = words.map((_,1))
點進map的方法,可以看到產生了一個MapPartitionsRDD
4、在向下執行,就會呼叫reduceByKey方法
val reduced = wordAndOne.reduceByKey(_+_)
這裡要注意啦,reduceByKey雖然是一個rdd呼叫的,但reduceByKey這個方法不是RDD中的方法
接下來點進reduceByKey方法,再點reduceByKey(defaultPartitioner(self), func)進去
點到combineByKeyWithClassTag裡面
點到combineByKeyWithClassTag中會看見,這裡面會生成一個ShuffleRDD
5、最後呼叫saveAsTextFile,在這裡面有呼叫了一個mapPartitions方法
在mapPartitions方法中會產生一個MapPartitionsRDD
所以綜合上述分析,我們可以看見在spark的一個標準的wordcount中一共會產生6個RDD,textFile() 會產生一個HadoopRDD和一個MapPerPartitionRDD,flatMap()方法會產生一個MapPartitionsRDD,map() 方法會產生一個MapPartitionsRDD ,reduceByKey()方法會產生一個ShuffleRD,saveAsTextFile會產生一個MapPartitionsRDD,所以一共會產生6個RDD。