1. 程式人生 > >Spark RDD運算元【四】

Spark RDD運算元【四】

自己總結了常用的部分運算元,方便自己理解和查閱

Spark RDD運算元列表

1. collectAsMap
2.count,countByKey,countByValue
3. filter,filterByRange
4.flatMapValues 
5.foldByKey
6.foreachPartition, foreach
7.keyBy ,keys,values
8.aggregate ,aggregateByKey
9.mapPartitionsWithIndex

10.reduceByKey和groupByKey區別

1.collectAsMap

scala> val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> rdd.collectAsMap
res5: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)

scala> 

2.count,countByKey,countByValue

scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> rdd1.count    //個數長度
res0: Long = 5

scala> rdd1.countByKey
res1: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)   

scala> rdd1.countByValue    //這個有點特別
res2: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)

3. filter,filterByRange

filter是符合條件的留下 filterByRange(key1,key2)  在key1和key2之間(包括key1和key2)的留下
scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> val rdd3=rdd1.filter(_._2>3).collect
rdd3: Array[(String, Int)] = Array((e,5), (d,4))

scala> val rdd2 = rdd1.filterByRange("b", "d")   //是先排好序,區間是前閉後閉
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at filterByRange at <console>:29 scala> rdd2.collect res6: Array[(String, Int)] = Array((c,3), (d,4), (c,2)) scala>

4.flatMapValues 

flatMapValue:能把 key="a"  value="1  2"   => key=“a”  "1" key="a"  "2"
scala> val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd3: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[21] at parallelize at <console>:27

scala> rdd3.flatMapValues(_.split(" ")).collect
res10: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

5.foldByKey

foldByKey:相同的key進入一組,相同key對應的value可以實現字串的拼接
scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:27

scala> val rdd2=rdd1.map(x=>(x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[26] at map at <console>:29

scala> rdd2.collect
res12: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))

scala> val rdd3=rdd2.foldByKey("")(_+_).collect
rdd3: Array[(Int, String)] = Array((4,wolfbear), (3,dogcat))

scala> 

6.foreachPartition, foreach

與map方法類似,map是對rdd中的每一個元素進行操作,而mapPartitions(foreachPartition)則是對rdd中的每個分割槽的迭代器進行操作。如果在map過程中需要頻繁建立額外的物件(例如將rdd中的資料通過jdbc寫入資料庫,map需要為每個元素建立一個連結而mapPartition為每個partition建立一個連結),則mapPartitions效率比map高的多。

7.keyBy ,keys,values

keyBy:以傳入的引數作為key,以原來的值作為value  
scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:27

scala> val rdd2 = rdd1.keyBy(_.length)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[33] at keyBy at <console>:29

scala> rdd2.collect
res14: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
keyBy再例子: 以第一個字母作為key
scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at parallelize at <console>:27

scala> val rdd2 = rdd1.keyBy(_(0))
rdd2: org.apache.spark.rdd.RDD[(Char, String)] = MapPartitionsRDD[39] at keyBy at <console>:29

scala> rdd2.collect
res17: Array[(Char, String)] = Array((d,dog), (s,salmon), (s,salmon), (r,rat), (e,elephant))

keys,values

keys是把所有的key都收集起來

values是把所有的value都收集起來

scala> val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val rdd2 = rdd1.map(x => (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:29

scala> rdd2.keys.collect
res0: Array[Int] = Array(3, 5, 4, 3, 7, 5)                                      

scala> rdd2.values.collect
res1: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

8.aggregate ,aggregateByKey

 這二個比較重要:

8.1aggregate

aggregate:
1.是可以給你一個對每個區內計算的機會,它比reduceBy更靈活 2.aggregate(0)(_+_,_+_)  0是預設值   第一個引數是區內計算  第二個引數是不同區內彙總 3.初始值問題: 有n個分割槽,要執行n+1次 理解初始值
scala> val rdd1=sc.parallelize(List(-1,-2,-3,4),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> rdd1.aggregate(0)(math.max(_,_),_+_)
res8: Int = 4
第一個分割槽的值為: 0 -1 -2 第二個分割槽的值為: 0 -3 4 聚合值為               : 0 0  4 最後的值為  0+0+4=4 理解任務的並行
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27

scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res10: String = 24

結果:可能是24,也可能是42,因為二個任務並行,說不定那個任務先執行完畢 aggregate 入門題目
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27


scala> rdd1.aggregate(0)(_+_, _+_)
res3: Int = 45
深入理解aggregate初始值
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:27

scala> rdd2.aggregate("|")(_ + _, _ + _)
res5: String = ||abc|def
分析: 可以抽象的理解第一個分割槽的初始值:  "|"  "a"   "b"   "c"     第一個分割槽j計算結果:|abc      可以抽象的理解第一個分割槽的初始值:  "|"  "d"   "e"   "f"      第二個分割槽計算結果: |def   可以抽象的理解彙總分割槽的初始值:     "|"  "abc"   "def"      彙總結果                  :| +|abc+|def

aggregate 經典題目

scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27

scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res7: String = 24

scala> 

scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:27

scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res8: String = 10

scala> 

scala> val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:27

scala> rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res9: String = 11
在這裡只說明第二個:

提一個概念:函式的完整性

結果為:10或者01

分析:第一個分割槽:“”  “12” “23”

不是從三個鍾選最小的再toString  而是兩兩比較toString再去第三個比較

8.2 aggregateByKey

根據相同的key進行區內可以計算
scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:27

scala> def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
     |   iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
     | }
func2: (index: Int, iter: Iterator[(String, Int)])Iterator[String]

scala> pairRDD.mapPartitionsWithIndex(func2).collect
res13: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])

scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res14: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))              
深入理解aggregateByKey的初始值
scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:27

scala> pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
結果分析: 第一個分割槽正常值:(cat  ,(2,5)) (mouse,4)   考慮預設值情況:(cat  ,(2,5,100))  (mouse,(4,100)) 第一個分割槽計算完結果 :(cat ,100)(mouse ,100) 第二個分割槽正常值:(cat  ,12) (mouse,2)  (dog,12) 考慮預設值情況:(cat  ,(12,100)) (mouse,(2,100))  (dog,(12,100)) 第二個分割槽計算完結果 :(cat ,100)(mouse ,100)(dog,100))
注意彙總階段不需要考慮預設值 第三階段計算前:(cat ,100)(mouse ,100)  (cat ,100)(mouse ,100)(dog,100)) 第三階段結算後:(cat ,200)(mouse ,200)(dog,100))

總結aggregate和aggreateByKey區別:  aggregate在彙總階段會考慮預設值 aggreateByKey在彙總階段不會考慮預設值

9.mapPartitionsWithIndex

mapPartitionsWithIndex 可以檢視每個分割槽的內容
1.首先自定義一個函式func 2.mapPartitionsWithIndex(func)
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> rdd1.mapPartitionsWithIndex(func1).collect
res0: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

10.reduceByKey和groupByKey區別


groupByKey不在每個區內計算,直接去彙總
reduceByKey在每個區內計算,再去彙總



因此,在對大資料進行復雜計算時,reduceByKey優於groupByKey

另外,如果僅僅是group處理,那麼以下函式應該優先於 groupByKey
  (1)、combineByKey 組合資料,但是組合之後的資料型別與輸入時值的型別不一樣。
  (2)、foldByKey合併每一個 key 的所有值,在級聯函式和“零值”中使用。

參考:http://blog.csdn.net/zongzhiyuan/article/details/49965021
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html