【Spark】RDD操作具體解釋4——Action算子
本質上在Actions算子中通過SparkContext運行提交作業的runJob操作,觸發了RDD DAG的運行。
依據Action算子的輸出空間將Action算子進行分類:無輸出、 HDFS、 Scala集合和數據類型。
無輸出
foreach
對RDD中的每一個元素都應用f函數操作,不返回RDD和Array,而是返回Uint。
圖中。foreach算子通過用戶自己定義函數對每一個數據項進行操作。 本例中自己定義函數為println,控制臺打印全部數據項。
源代碼:
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
HDFS
(1)saveAsTextFile
函數將數據輸出。存儲到HDFS的指定文件夾。
將RDD中的每一個元素映射轉變為(Null,x.toString),然後再將其寫入HDFS。
圖中,左側的方框代表RDD分區,右側方框代表HDFS的Block。
通過函數將RDD的每一個分區存儲為HDFS中的一個Block。
源代碼:
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
// https://issues.apache.org/jira/browse/SPARK-2075
//
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
// Ordering for it and will use the default `null`. However, it‘s a `Comparable[NullWritable]`
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
// Ordering for `NullWritable`. That‘s why the compiler will generate different anonymous
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
//
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
// same bytecodes for `saveAsTextFile`.
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
// https://issues.apache.org/jira/browse/SPARK-2075
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}
(2)saveAsObjectFile
saveAsObjectFile將分區中的每10個元素組成一個Array,然後將這個Array序列化。映射為(Null,BytesWritable(Y))的元素,寫入HDFS為SequenceFile的格式。
圖中,左側方框代表RDD分區,右側方框代表HDFS的Block。 通過函數將RDD的每一個分區存儲為HDFS上的一個Block。
源代碼:
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
Scala集合和數據類型
(1)collect
collect相當於toArray。toArray已經過時不推薦使用,collect將分布式的RDD返回為一個單機的scala Array數組。
在這個數組上運用scala的函數式操作。
圖中,左側方框代表RDD分區。右側方框代表單機內存中的數組。
通過函數操作,將結果返回到Driver程序所在的節點,以數組形式存儲。
源代碼:
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
(2)collectAsMap
collectAsMap對(K,V)型的RDD數據返回一個單機HashMap。
對於反復K的RDD元素,後面的元素覆蓋前面的元素。
圖中,左側方框代表RDD分區。右側方框代表單機數組。數據通過collectAsMap函數返回給Driver程序計算結果,結果以HashMap形式存儲。
源代碼:
/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn‘t return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*/
def collectAsMap(): Map[K, V] = {
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach { pair => map.put(pair._1, pair._2) }
map
}
(3)reduceByKeyLocally
實現的是先reduce再collectAsMap的功能,先對RDD的總體進行reduce操作,然後再收集全部結果返回為一個HashMap。
源代碼:
/**
* Merge the values for each key using an associative reduce function, but return the results
* immediately to the master as a Map. This will also perform the merging locally on each mapper
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
if (keyClass.isArray) {
throw new SparkException("reduceByKeyLocally() does not support array keys")
}
val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
}
m1
} : JHashMap[K, V]
self.mapPartitions(reducePartition).reduce(mergeMaps)
}
(4)lookup
Lookup函數對(Key,Value)型的RDD操作。返回指定Key相應的元素形成的Seq。這個函數處理優化的部分在於,假設這個RDD包括分區器,則僅僅會相應處理K所在的分區。然後返回由(K,V)形成的Seq。假設RDD不包括分區器。則須要對全RDD元素進行暴力掃描處理,搜索指定K相應的元素。
圖中。左側方框代表RDD分區。右側方框代表Seq。最後結果返回到Driver所在節點的應用中。
源代碼:
/**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
def lookup(key: K): Seq[V] = {
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
} : Seq[V]
val res = self.context.runJob(self, process, Array(index), false)
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
}
}
(5)count
count返回整個RDD的元素個數。
圖中,返回數據的個數為5。一個方塊代表一個RDD分區。
源代碼:
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
(6)top
top可返回最大的k個元素。
相近函數說明:
- top返回最大的k個元素。
- take返回最小的k個元素。
- takeOrdered返回最小的k個元素, 而且在返回的數組中保持元素的順序。
- first相當於top( 1) 返回整個RDD中的前k個元素, 能夠定義排序的方式Ordering[T]。返回的是一個含前k個元素的數組。
源代碼:
/**
* Returns the top k (largest) elements from this RDD as defined by the specified
* implicit Ordering[T]. This does the opposite of [[takeOrdered]]. For example:
* {{{
* sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
* // returns Array(12)
*
* sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
* // returns Array(6, 5)
* }}}
*
* @param num k, the number of top elements to return
* @param ord the implicit ordering for T
* @return an array of top elements
*/
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)
(7)reduce
reduce函數相當於對RDD中的元素進行reduceLeft函數的操作。
reduceLeft先對兩個元素
/**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
(8)fold
fold和reduce的原理同樣。可是與reduce不同,相當於每一個reduce時。叠代器取的第一個元素是zeroValue。
圖中,通過用戶自己定義函數進行fold運算,圖中的一個方框代表一個RDD分區。
源代碼:
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
* modify t1 and return it as its result value to avoid object allocation; however, it should not
* modify t2.
*/
def fold(zeroValue: T)(op: (T, T) => T): T = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
(9)aggregate
aggregate先對每一個分區的全部元素進行aggregate操作,再對分區的結果進行fold操作。
aggreagate與fold和reduce的不同之處在於。aggregate相當於採用歸並的方式進行數據聚集。這樣的聚集是並行化的。
而在fold和reduce函數的運算過程中,每一個分區中須要進行串行處理,每一個分區串行計算完結果,結果再按之前的方式進行聚集,並返回終於聚集結果。
圖中。通過用戶自己定義函數對RDD 進行aggregate的聚集操作。圖中的每一個方框代表一個RDD分區。
源代碼:
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using
* given combine functions and a neutral "zero value". This function can return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* and one operation for merging two U‘s, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
轉載請註明作者Jason Ding及其出處
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
Github博客主頁(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁
【Spark】RDD操作具體解釋4——Action算子