1. 程式人生 > >Spark RDD使用詳解5--Action運算元

Spark RDD使用詳解5--Action運算元

本質上在Actions運算元中通過SparkContext執行提交作業的runJob操作,觸發了RDD DAG的執行。 
根據Action運算元的輸出空間將Action運算元進行分類:無輸出、 HDFS、 Scala集合和資料型別。

無輸出

foreach

對RDD中的每個元素都應用f函式操作,不返回RDD和Array,而是返回Uint。 

圖中,foreach運算元通過使用者自定義函式對每個資料項進行操作。 本例中自定義函式為println,控制檯列印所有資料項。

原始碼:

  1. /**

  2. * Applies a function f to all elements of this RDD.

  3. */

  4. def foreach(f: T => Unit) {

  5. val cleanF = sc.clean(f)

  6. sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

  7. }

HDFS

(1)saveAsTextFile

函式將資料輸出,儲存到HDFS的指定目錄。將RDD中的每個元素對映轉變為(Null,x.toString),然後再將其寫入HDFS。 

圖中,左側的方框代表RDD分割槽,右側方框代表HDFS的Block。 通過函式將RDD的每個分割槽儲存為HDFS中的一個Block。

原始碼:

  1. /**

  2. * Save this RDD as a text file, using string representations of elements.

  3. */

  4. def saveAsTextFile(path: String) {

  5. // https://issues.apache.org/jira/browse/SPARK-2075

  6. //

  7. // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit

  8. // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`

  9. // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an

  10. // Ordering for `NullWritable`. That's why the compiler will generate different anonymous

  11. // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.

  12. //

  13. // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate

  14. // same bytecodes for `saveAsTextFile`.

  15. val nullWritableClassTag = implicitly[ClassTag[NullWritable]]

  16. val textClassTag = implicitly[ClassTag[Text]]

  17. val r = this.mapPartitions { iter =>

  18. val text = new Text()

  19. iter.map { x =>

  20. text.set(x.toString)

  21. (NullWritable.get(), text)

  22. }

  23. }

  24. RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

  25. .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

  26. }

  27.  
  28. /**

  29. * Save this RDD as a compressed text file, using string representations of elements.

  30. */

  31. def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {

  32. // https://issues.apache.org/jira/browse/SPARK-2075

  33. val nullWritableClassTag = implicitly[ClassTag[NullWritable]]

  34. val textClassTag = implicitly[ClassTag[Text]]

  35. val r = this.mapPartitions { iter =>

  36. val text = new Text()

  37. iter.map { x =>

  38. text.set(x.toString)

  39. (NullWritable.get(), text)

  40. }

  41. }

  42. RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)

  43. .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)

  44. }

(2)saveAsObjectFile

saveAsObjectFile將分割槽中的每10個元素組成一個Array,然後將這個Array序列化,對映為(Null,BytesWritable(Y))的元素,寫入HDFS為SequenceFile的格式。

圖中,左側方框代表RDD分割槽,右側方框代表HDFS的Block。 通過函式將RDD的每個分割槽儲存為HDFS上的一個Block。

原始碼:

  1. /**

  2. * Save this RDD as a SequenceFile of serialized objects.

  3. */

  4. def saveAsObjectFile(path: String) {

  5. this.mapPartitions(iter => iter.grouped(10).map(_.toArray))

  6. .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))

  7. .saveAsSequenceFile(path)

  8. }

Scala集合和資料型別

(1)collect

collect相當於toArray,toArray已經過時不推薦使用,collect將分散式的RDD返回為一個單機的scala Array陣列。 在這個陣列上運用scala的函式式操作。

圖中,左側方框代表RDD分割槽,右側方框代表單機記憶體中的陣列。通過函式操作,將結果返回到Driver程式所在的節點,以陣列形式儲存。

原始碼:

  1. /**

  2. * Return an array that contains all of the elements in this RDD.

  3. */

  4. def collect(): Array[T] = {

  5. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

  6. Array.concat(results: _*)

  7. }

(2)collectAsMap

collectAsMap對(K,V)型的RDD資料返回一個單機HashMap。對於重複K的RDD元素,後面的元素覆蓋前面的元素。 

圖中,左側方框代表RDD分割槽,右側方框代表單機陣列。資料通過collectAsMap函式返回給Driver程式計算結果,結果以HashMap形式儲存。

原始碼:

  1. /**

  2. * Return the key-value pairs in this RDD to the master as a Map.

  3. *

  4. * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only

  5. * one value per key is preserved in the map returned)

  6. */

  7. def collectAsMap(): Map[K, V] = {

  8. val data = self.collect()

  9. val map = new mutable.HashMap[K, V]

  10. map.sizeHint(data.length)

  11. data.foreach { pair => map.put(pair._1, pair._2) }

  12. map

  13. }

(3)reduceByKeyLocally

實現的是先reduce再collectAsMap的功能,先對RDD的整體進行reduce操作,然後再收集所有結果返回為一個HashMap。

原始碼:

  1. /**

  2. * Merge the values for each key using an associative reduce function, but return the results

  3. * immediately to the master as a Map. This will also perform the merging locally on each mapper

  4. * before sending results to a reducer, similarly to a "combiner" in MapReduce.

  5. */

  6. def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {

  7.  
  8. if (keyClass.isArray) {

  9. throw new SparkException("reduceByKeyLocally() does not support array keys")

  10. }

  11.  
  12. val reducePartition = (iter: Iterator[(K, V)]) => {

  13. val map = new JHashMap[K, V]

  14. iter.foreach { pair =>

  15. val old = map.get(pair._1)

  16. map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))

  17. }

  18. Iterator(map)

  19. } : Iterator[JHashMap[K, V]]

  20.  
  21. val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {

  22. m2.foreach { pair =>

  23. val old = m1.get(pair._1)

  24. m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))

  25. }

  26. m1

  27. } : JHashMap[K, V]

  28.  
  29. self.mapPartitions(reducePartition).reduce(mergeMaps)

  30. }

(4)lookup

Lookup函式對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。這個函式處理優化的部分在於,如果這個RDD包含分割槽器,則只會對應處理K所在的分割槽,然後返回由(K,V)形成的Seq。如果RDD不包含分割槽器,則需要對全RDD元素進行暴力掃描處理,搜尋指定K對應的元素。

圖中,左側方框代表RDD分割槽,右側方框代表Seq,最後結果返回到Driver所在節點的應用中。

原始碼:

  1. def lookup(key: K): Seq[V] = {

  2. self.partitioner match {

  3. case Some(p) =>

  4. val index = p.getPartition(key)

  5. val process = (it: Iterator[(K, V)]) => {

  6. val buf = new ArrayBuffer[V]

  7. for (pair <- it if pair._1 == key) {

  8. buf += pair._2

  9. }

  10. buf

  11. } : Seq[V]

  12. val res = self.context.runJob(self, process, Array(index), false)

  13. res(0)

  14. case None =>

  15. self.filter(_._1 == key).map(_._2).collect()

  16. }

  17. }

(5)count

count返回整個RDD的元素個數。 

圖中,返回資料的個數為5。一個方塊代表一個RDD分割槽。

原始碼

  1. /**

  2. * Return the number of elements in the RDD.

  3. */

  4. def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

(6)top

top可返回最大的k個元素。 
相近函式說明:

  • top返回最大的k個元素。
  • take返回最小的k個元素。
  • takeOrdered返回最小的k個元素, 並且在返回的陣列中保持元素的順序。
  • first相當於top( 1) 返回整個RDD中的前k個元素, 可以定義排序的方式Ordering[T]。返回的是一個含前k個元素的陣列。
  1. /**

  2. * Returns the top k (largest) elements from this RDD as defined by the specified

  3. * implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:

  4. * {{{

  5. * sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)

  6. * // returns Array(12)

  7. *

  8. * sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)

  9. * // returns Array(6, 5)

  10. * }}}

  11. *

  12. * @param num k, the number of top elements to return

  13. * @param ord the implicit ordering for T

  14. * @return an array of top elements

  15. */

  16. def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

(7)reduce

reduce函式相當於對RDD中的元素進行reduceLeft函式的操作。 
reduceLeft先對兩個元素

  1. /**

  2. * Reduces the elements of this RDD using the specified commutative and

  3. * associative binary operator.

  4. */

  5. def reduce(f: (T, T) => T): T = {

  6. val cleanF = sc.clean(f)

  7. val reducePartition: Iterator[T] => Option[T] = iter => {

  8. if (iter.hasNext) {

  9. Some(iter.reduceLeft(cleanF))

  10. } else {

  11. None

  12. }

  13. }

  14. var jobResult: Option[T] = None

  15. val mergeResult = (index: Int, taskResult: Option[T]) => {

  16. if (taskResult.isDefined) {

  17. jobResult = jobResult match {

  18. case Some(value) => Some(f(value, taskResult.get))

  19. case None => taskResult

  20. }

  21. }

  22. }

  23. sc.runJob(this, reducePartition, mergeResult)

  24. // Get the final result out of our Option, or throw an exception if the RDD was empty

  25. jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))

  26. }

(8)fold

fold和reduce的原理相同,但是與reduce不同,相當於每個reduce時,迭代器取的第一個元素是zeroValue。 

圖中,通過使用者自定義函式進行fold運算,圖中的一個方框代表一個RDD分割槽。

原始碼:

  1. /**

  2. * Aggregate the elements of each partition, and then the results for all the partitions, using a

  3. * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to

  4. * modify t1 and return it as its result value to avoid object allocation; however, it should not

  5. * modify t2.

  6. */

  7. def fold(zeroValue: T)(op: (T, T) => T): T = {

  8. // Clone the zero value since we will also be serializing it as part of tasks

  9. var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())

  10. val cleanOp = sc.clean(op)

  11. val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)

  12. val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)

  13. sc.runJob(this, foldPartition, mergeResult)

  14. jobResult

  15. }

(9)aggregate

aggregate先對每個分割槽的所有元素進行aggregate操作,再對分割槽的結果進行fold操作。 
aggreagate與fold和reduce的不同之處在於,aggregate相當於採用歸併的方式進行資料聚集,這種聚集是並行化的。 而在fold和reduce函式的運算過程中,每個分割槽中需要進行序列處理,每個分割槽序列計算完結果,結果再按之前的方式進行聚集,並返回最終聚集結果。

圖中,通過使用者自定義函式對RDD 進行aggregate的聚集操作,圖中的每個方框代表一個RDD分割槽。

原始碼:

  1. /**

  2. * Aggregate the elements of each partition, and then the results for all the partitions, using

  3. * given combine functions and a neutral "zero value". This function can return a different result

  4. * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U

  5. * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are

  6. * allowed to modify and return their first argument instead of creating a new U to avoid memory

  7. * allocation.

  8. */

  9. def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {

  10. // Clone the zero value since we will also be serializing it as part of tasks

  11. var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())

  12. val cleanSeqOp = sc.clean(seqOp)

  13. val cleanCombOp = sc.clean(combOp)

  14. val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)

  15. val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)

  16. sc.runJob(this, aggregatePartition, mergeResult)

  17. jobResult

  18. }