spark的運算元Tranformation和Action的使用demo
阿新 • • 發佈:2019-02-15
- 在spark中有兩種運算元:Tranformation和Action
Tranformation: 中文為轉換,他會延遲載入,當你執行了Tranformation的運算元,spark並不會立即進行計算,而是會記錄計算的元資料,比如你執行如下操作:
sc.textFile("hdfs://cdhnode1:8030/wordcount")
spark並不會把資料讀入到RDD中,而是記錄在那個地方讀取,當遇到Action運算元時才會去真正的執行。
Action:中文為行動,顧名思義他和Tranformation不一樣,Action運算元會立即執行
- 什麼是RDD
RDD是Resilient Distributed Datasets的縮寫及彈性分散式資料集,在spark的原始碼RDD類中,作者寫了一段註釋:
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
* through implicit.
* * Internally, each RDD is characterized by five main properties:
* * - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
* * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
- 其中中間說RDD有5個特點:
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* //有一組的patition
* - A function for computing each split
* //由一個函式計算每一個分片
* - A list of dependencies on other RDDs
* //具有依賴性
*
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* //key-value型的RDD是根據雜湊來分割槽的
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
* //資料本地性計算
- RDD的建立方式:
1.通過spark支援的資料來源建立RDD,例如使用hdfs建立rdd
sc.textFile("hdfs://cdhnode1:8030/wordcount")
2.通過集合或者陣列以並行化的方式建立
sc.parallelize(Array(1,2,3,4,5,6,7,8))
- 下面是Tranformation和Action的demo例子
1.啟動spark
2.開啟spark shell
/home/hadoop/app/spark-1.3.1-bin-hadoop2.6/bin/spark-shell --master spark://cdhnode1:7077 --executor-memory 512m --total-executor-cores 2
一 .Tranformation demo
map
//建立rdd
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:21
//map
scala> rdd1.map(_*10)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:24
//檢視結果(collect屬於Action運算元)
scala> res0.collect
res1: Array[Int] = Array(10, 20, 30, 40, 50)
sortBy
//建立rdd
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:21
//sortBy
scala> rdd1.sortBy(x=>x,true)
res2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at sortBy at <console>:24
//檢視結果(collect屬於Action運算元)
scala> res2.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)
flatMap
//建立rdd
scala> val rdd1 = sc.parallelize(Array("hello world", "hello word count", "hello lijie"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:21
//flatMap
scala> rdd1.flatMap(_.split(" ")).collect
//結果
res4: Array[String] = Array(hello, world, hello, word, count, hello, lijie)
//巢狀Array可以用兩次flatMap
//建立rdd
scala> val rdd1 = sc.parallelize(Array(Array("hello world", "hello word count", "hello lijie")))
rdd1: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[13] at parallelize at <console>:21
//flatMap
scala> rdd1.flatMap(_.flatMap(_.split(" "))).collect
//結果
res5: Array[String] = Array(hello, world, hello, word, count, hello, lijie)
union
scala> val rdd1 = sc.parallelize(List(5,6,4,7))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:21
scala> val rdd1 = sc.parallelize(List(1,2,3,0))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(List(5,3,2,1))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:21
scala> val ddr3 = rdd1.union(rdd2)
ddr3: org.apache.spark.rdd.RDD[Int] = UnionRDD[18] at union at <console>:25
scala> ddr3.collect
res6: Array[Int] = Array(1, 2, 3, 0, 5, 3, 2, 1)
intersection
scala> val ddr3 = rdd1.intersection(rdd2)
ddr3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at intersection at <console>:25
scala> ddr3.collect
res7: Array[Int] = Array(2, 1, 3)
join
scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:21
scala> rdd1.join(rdd2).collect
res8: Array[(String, (Int, Int))] = Array((zhangsan,(28,100)), (lijie,(24,99)))
leftOuterJoin
scala> rdd1.leftOuterJoin(rdd2).collect
res9: Array[(String, (Int, Option[Int]))] = Array((zhangsan,(28,Some(100))), (lijie,(24,Some(99))), (lisi,(39,None)))
rightOuterJoin
scala> rdd1.rightOuterJoin(rdd2).collect
res10: Array[(String, (Option[Int], Int))] = Array((zhangsan,(Some(28),100)), (wangwu,(None,88)), (lijie,(Some(24),99)))
groupByKey
scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[36] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:21
scala> rdd1.union(rdd2).groupByKey.collect
res11: Array[(String, Iterable[Int])] = Array((zhangsan,CompactBuffer(28, 100)), (wangwu,CompactBuffer(88)), (lijie,CompactBuffer(24, 99)), (lisi,CompactBuffer(39)))
//使用它計算wordcount
scala> rdd1.union(rdd2).groupByKey.map(x=>(x._1,x._2.sum)).collect
res12: Array[(String, Int)] = Array((zhangsan,128), (wangwu,88), (lijie,123), (lisi,39))
reduceByKey
scala> rdd1.union(rdd2).reduceByKey(_+_).collect
res13: Array[(String, Int)] = Array((zhangsan,128), (wangwu,88), (lijie,123), (lisi,39))
cogroup
scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lijie", 999),("haha", 123)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:21
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[48] at cogroup at <console>:25
scala> rdd3.collect
res14: ArrayBuffer((zhangsan,(CompactBuffer(28),CompactBuffer(28))), (lijie,(CompactBuffer(24, 999),CompactBuffer(24))), (haha,(CompactBuffer(123),CompactBuffer())), (lisi,(CompactBuffer(),CompactBuffer(39))))
//利用它計算worldcount
scala> rdd1.cogroup(rdd2).groupByKey.mapValues(_.map(x => {x._1.sum + x._2.sum})).collect()
//或者
scala> rdd1.cogroup(rdd2).map(x =>{(x._1,x._2._1.sum+x._2._2.sum)}).collect
res16: Array[(String, Int)] = ArrayBuffer((zhangsan,List(56)), (lijie,List(1047)), (haha,List(123)), (lisi,List(39)))
cartesian
scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:21
scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[53] at parallelize at <console>:21
scala> rdd1.cartesian(rdd2).collect
res17: Array[((String, Int), (String, Int))] = Array(((lijie,24),(lijie,99)), ((lijie,24),(wangwu,88)), ((lijie,24),(zhangsan,100)), ((zhangsan,28),(lijie,99)), ((lisi,39),(lijie,99)), ((zhangsan,28),(wangwu,88)), ((zhangsan,28),(zhangsan,100)), ((lisi,39),(wangwu,88)), ((lisi,39),(zhangsan,100)))
mapPartitionsWithIndex
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
//他需要傳入一個函式,返回一個Iterator
scala> val func = (index: Int, iter: Iterator[(Int)]) => {
| iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
| }
func: (Int, Iterator[Int]) => Iterator[String] = <function2>
scala> rdd1.mapPartitionsWithIndex(func)
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:26
scala> res1.collect
res2: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
#計算每個分割槽處理的數值之和
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val func = (a: Int, b: Iterator[(Int)]) => {
b.toList.map(x => {
(a,x)
}).toIterator
}
println(rdd1.mapPartitionsWithIndex(func).reduceByKey(_+_).collect().toBuffer)
#結果
ArrayBuffer((0,6), (1,15), (2,24))
aggregateByKey
//1. 求每種動物的總數
scala> val rdd1 = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:21
scala> rdd1.aggregateByKey(0)(_ + _, _ + _).collect
res11: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
//2. 求每個分割槽動物最大的,然後再將不同分割槽的加起來
scala> rdd1.aggregateByKey(0)(math.max(_, _), _ + _).collect
res12: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
//3.其他測試
scala> rdd1.aggregateByKey(100)(math.max(_, _), _ + _).collect
res13: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
combineByKey
//1.
scala> val rdd1 = sc.parallelize(List("hello world","hello lijie","lijie test"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:21
//x => x表示初始值,(a: Int, b: Int) => a + b表示區域性相加,(m: Int, n: Int) => m + n表示區域性加完之後再相加
scala> rdd1.flatMap(_.split(" ")).map((_,1)).combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n).collect
res15: Array[(String, Int)] = Array((hello,2), (world,1), (test,1), (lijie,2))
//2.其他 數字相同的 放一個集合中
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:21
scala> val rdd2 = sc.parallelize(List(1,2,4,2),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:21
scala> val rdd3 = rdd2.zip(rdd1)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[23] at zip at <console>:25
scala> rdd3.collect
res19: Array[(Int, String)] = Array((1,a), (2,b), (4,c), (2,d))
scala> rdd3.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ::: n)
res17: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[21] at combineByKey at <console>:28
scala> rdd3.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ::: n)
res20: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[24] at combineByKey at <console>:28
scala> res20.collect
res21: Array[(Int, List[String])] = Array((4,List(c)), (2,List(b, d)), (1,List(a)))
repartition(num)和coalesce(num,true)一樣
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val rdd2 = rdd1.repartition(5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:23
flatMapValues
scala> val rdd1 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[17] at parallelize at <console>:21
scala> val rdd2 = rdd1.flatMapValues(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[18] at flatMapValues at <console>:23
scala> rdd2.collect
res6: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
foldByKey
scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:21
scala> val rdd2 = rdd1.map(x => (x.length, x)).foldByKey("")(_+_)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[21] at foldByKey at <console>:23
scala> rdd2.collect
res7: Array[(Int, String)] = Array((4,lisihehe), (5,lijiehello))
keyBy
scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at parallelize at <console>:21
//讓字串的長度作為key
scala> rdd1.keyBy(_.length)
res10: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at keyBy at <console>:24
scala> res10.collect
res11: Array[(Int, String)] = Array((5,lijie), (5,hello), (4,lisi), (4,hehe))
keys
scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:21
scala> rdd1.map(x => (x.length,x))
res13: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[27] at map at <console>:24
scala> res13.keys.collect
res14: Array[Int] = Array(5, 5, 4, 4)
values
scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:21
scala> rdd1.map(x => (x.length,x))
res13: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[27] at map at <console>:24
scala> res13.values.collect
res15: Array[String] = Array(lijie, hello, lisi, hehe)
二 .Tranformation demo
collect
sc.textFile("hdfs://cdhnode2:8030/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res26: Array[(String, Int)] = Array(("",18), (the,8), (and,6), (of,5), (The,4), (cryptographic,3), (encryption,3), (for,3), (this,3), (Software,2), (on,2), (which,2), (software,2), (at:,2), (includes,2), (import,,2), (use,,2), (or,2), (software.,2), (software,,2), (more,2), (to,2), (distribution,2), (using,2), (re-export,2), (information,2), (possession,,2), (our,2), (please,2), (Export,2), (under,1), (country,1), (is,1), (Technology,1), (Jetty,1), (currently,1), (check,1), (permitted.,1), (have,1), (Security,1), (U.S.,1), (with,1), (BIS,1), (This,1), (mortbay.org.,1), ((ECCN),1), (security,1), (Department,1), (export,1), (reside,1), (any,1), (algorithms.,1), (from,1), (details,1), (has,1), (SSL,1), (Industry,1), (Administration,1), (provides,1), (http://hadoop.apache.org/core/,1), (cou...
collectAsMap
scala> val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:21
scala> rdd.collectAsMap
res0: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
- saveAsTextFile
sc.textFile("hdfs://cdhnode2:8030/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://cdhnode2:8030/wordcount/myout")
reduce
這裡寫程式碼片
count
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[97] at parallelize at <console>:21
scala> rdd1.reduce(_+_)
res28: Int = 15
top
scala> rdd1.top(3)
res29: Array[Int] = Array(5, 4, 3)
take
scala> rdd1.take(3)
res30: Array[Int] = Array(1, 2, 3)
first
scala> rdd1.first
res31: Int = 1
takeOrdered
scala> rdd1.takeOrdered(3)
res33: Array[Int] = Array(1, 2, 3)
aggregate
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21
//1. 使用它求和
//第一個引數是初始值, 二:是2個函式[每個函式都是2個引數(第一個引數:先對個個分割槽進行合併, 第二個:對個個分割槽合併後的結果再進行合併), 輸出一個引數
scala> rdd1.aggregate(0)(_+_, _+_)
//結果
res3: Int = 45
//2. 使用它求上面分割槽最大值的和
scala> rdd1.aggregate(0)(math.max(_, _), _ + _)
//結果
res4: Int = 13
//3. 其他01
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21
scala> rdd2.aggregate("")(_+_,_+_)
res5: String = abcdef
scala> rdd2.aggregate("*")(_ + _, _ + _)
res7: String = **abc*def
//4. 其他02
scala> val rdd3 = sc.parallelize(List("a","bb","ccc","dddd"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:21
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res8: String = 24 或者