1. 程式人生 > 實用技巧 >SparkRdd實現單詞統計 原始碼分析

SparkRdd實現單詞統計 原始碼分析

SparkRdd實現單詞統計 原始碼分析

1 手寫單詞統計

//設定任務名字 local本地模式
val conf=new SparkConf().setAppName("WC").setMaster("local")
//通向spark叢集的入口
val sc =new SparkContext(conf)
// sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile(args(1))

2 本地Debug除錯資訊

3 本地debug除錯

(1) MapPartitionsRDD[7] at sortBy at SparkWordCount.scala:21 []

| ShuffledRDD[6] at sortBy at SparkWordCount.scala:21 []

+-(1) MapPartitionsRDD[5] at sortBy at SparkWordCount.scala:21 []

| ShuffledRDD[4] at reduceByKey at SparkWordCount.scala:21 []

+-(1) MapPartitionsRDD[3] at map at SparkWordCount.scala:21 []

| MapPartitionsRDD[2] at flatMap at SparkWordCount.scala:21 []

| MapPartitionsRDD[1] at textFile at SparkWordCount.scala:20 []

| file:///c:/tools/test/data/a.txt HadoopRDD[0] at textFile at SparkWordCount.scala:20 []


原始碼分析

原始碼分析

1.sc.textFile("")產生rdd(HadoopRDD[0],MapPartitionsRDD[1])

//textFile會產生兩個rdd

/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/

def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}

第一個RDD HadoopRDD 主要將路徑和資料,廣播變數,檔案輸入型別(InputFormat)

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
*
'''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a
`map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}

第二個RDD 呼叫map方法 把values取出

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

2 .flatMap(_.split(" "))產生rdd(MapPartitionsRDD[2])

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
呼叫saclamap方法檢測一些資訊
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

3 .map((_,1))產生rdd(MapPartitionsRDD[3])

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
資料整理(value1)將資料返回
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

4 .reduceByKey(_+_)產生rdd(ShuffledRDD[4])

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
new ShuffledRDD 聚合計算(1 區域性聚合 2 上游取出 整體聚合)
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}

/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

/**
* :: Experimental ::
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
*
* -
`createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* -
`mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* -
`mergeCombiners`, to combine two C's into a single one.
*
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}

5 .sortBy(_._2,false) 產生三個rdd(MapPartitionsRDD[5] ,ShuffledRDD[6],MapPartitionsRDD[7])

/**
* Return this RDD sorted by the given key function.
*/

def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
*
`collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the
`save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
// TODO: this currently doesn't work on P other than Tuple2!
new ShuffledRDD
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

MapPartitionsRDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}