Spark RDD運算元【四】
阿新 • • 發佈:2019-02-15
自己總結了常用的部分運算元,方便自己理解和查閱
Spark RDD運算元列表
1. collectAsMap
2.count,countByKey,countByValue
3. filter,filterByRange
4.flatMapValues
5.foldByKey
6.foreachPartition, foreach
7.keyBy ,keys,values
8.aggregate ,aggregateByKey
9.mapPartitionsWithIndex
10.reduceByKey和groupByKey區別
1.collectAsMap
scala> val rdd = sc.parallelize(List(("a", 1), ("b", 2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:27 scala> rdd.collectAsMap res5: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1) scala>
2.count,countByKey,countByValue
scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:27 scala> rdd1.count //個數長度 res0: Long = 5 scala> rdd1.countByKey res1: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2) scala> rdd1.countByValue //這個有點特別 res2: scala.collection.Map[(String, Int),Long] = Map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)
3. filter,filterByRange
filter是符合條件的留下 filterByRange(key1,key2) 在key1和key2之間(包括key1和key2)的留下scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> val rdd3=rdd1.filter(_._2>3).collect
rdd3: Array[(String, Int)] = Array((e,5), (d,4))
scala> val rdd2 = rdd1.filterByRange("b", "d") //是先排好序,區間是前閉後閉
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at filterByRange at <console>:29
scala> rdd2.collect
res6: Array[(String, Int)] = Array((c,3), (d,4), (c,2))
scala>
4.flatMapValues
flatMapValue:能把 key="a" value="1 2" => key=“a” "1" key="a" "2"scala> val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd3: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[21] at parallelize at <console>:27
scala> rdd3.flatMapValues(_.split(" ")).collect
res10: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))
5.foldByKey
foldByKey:相同的key進入一組,相同key對應的value可以實現字串的拼接scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:27
scala> val rdd2=rdd1.map(x=>(x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[26] at map at <console>:29
scala> rdd2.collect
res12: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))
scala> val rdd3=rdd2.foldByKey("")(_+_).collect
rdd3: Array[(Int, String)] = Array((4,wolfbear), (3,dogcat))
scala>
6.foreachPartition, foreach
與map方法類似,map是對rdd中的每一個元素進行操作,而mapPartitions(foreachPartition)則是對rdd中的每個分割槽的迭代器進行操作。如果在map過程中需要頻繁建立額外的物件(例如將rdd中的資料通過jdbc寫入資料庫,map需要為每個元素建立一個連結而mapPartition為每個partition建立一個連結),則mapPartitions效率比map高的多。7.keyBy ,keys,values
keyBy:以傳入的引數作為key,以原來的值作為valuescala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:27
scala> val rdd2 = rdd1.keyBy(_.length)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[33] at keyBy at <console>:29
scala> rdd2.collect
res14: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
keyBy再例子:
以第一個字母作為key
scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at parallelize at <console>:27
scala> val rdd2 = rdd1.keyBy(_(0))
rdd2: org.apache.spark.rdd.RDD[(Char, String)] = MapPartitionsRDD[39] at keyBy at <console>:29
scala> rdd2.collect
res17: Array[(Char, String)] = Array((d,dog), (s,salmon), (s,salmon), (r,rat), (e,elephant))
keys,values
keys是把所有的key都收集起來
values是把所有的value都收集起來
scala> val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val rdd2 = rdd1.map(x => (x.length, x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[1] at map at <console>:29
scala> rdd2.keys.collect
res0: Array[Int] = Array(3, 5, 4, 3, 7, 5)
scala> rdd2.values.collect
res1: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
8.aggregate ,aggregateByKey
這二個比較重要:8.1aggregate
aggregate:1.是可以給你一個對每個區內計算的機會,它比reduceBy更靈活 2.aggregate(0)(_+_,_+_) 0是預設值 第一個引數是區內計算 第二個引數是不同區內彙總 3.初始值問題: 有n個分割槽,要執行n+1次 理解初始值
scala> val rdd1=sc.parallelize(List(-1,-2,-3,4),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> rdd1.aggregate(0)(math.max(_,_),_+_)
res8: Int = 4
第一個分割槽的值為: 0 -1 -2
第二個分割槽的值為: 0 -3 4
聚合值為 : 0 0 4
最後的值為 0+0+4=4
理解任務的並行
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res10: String = 24
結果:可能是24,也可能是42,因為二個任務並行,說不定那個任務先執行完畢 aggregate 入門題目
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
scala> rdd1.aggregate(0)(_+_, _+_)
res3: Int = 45
深入理解aggregate初始值
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:27
scala> rdd2.aggregate("|")(_ + _, _ + _)
res5: String = ||abc|def
分析: 可以抽象的理解第一個分割槽的初始值: "|" "a" "b" "c" 第一個分割槽j計算結果:|abc
可以抽象的理解第一個分割槽的初始值: "|" "d" "e" "f" 第二個分割槽計算結果: |def
可以抽象的理解彙總分割槽的初始值: "|" "abc" "def" 彙總結果 :| +|abc+|defaggregate 經典題目
scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res7: String = 24
scala>
scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:27
scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res8: String = 10
scala>
scala> val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:27
scala> rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res9: String = 11
在這裡只說明第二個:
提一個概念:函式的完整性
結果為:10或者01
分析:第一個分割槽:“” “12” “23”
不是從三個鍾選最小的再toString 而是兩兩比較toString再去第三個比較
8.2 aggregateByKey
根據相同的key進行區內可以計算scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
| iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
| }
func2: (index: Int, iter: Iterator[(String, Int)])Iterator[String]
scala> pairRDD.mapPartitionsWithIndex(func2).collect
res13: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
scala> pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res14: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
深入理解aggregateByKey的初始值
scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:27
scala> pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
結果分析:
第一個分割槽正常值:(cat ,(2,5)) (mouse,4)
考慮預設值情況:(cat ,(2,5,100)) (mouse,(4,100))
第一個分割槽計算完結果 :(cat ,100)(mouse ,100)
第二個分割槽正常值:(cat ,12) (mouse,2) (dog,12)
考慮預設值情況:(cat ,(12,100)) (mouse,(2,100)) (dog,(12,100))
第二個分割槽計算完結果 :(cat ,100)(mouse ,100)(dog,100))
注意彙總階段不需要考慮預設值 第三階段計算前:(cat ,100)(mouse ,100) (cat ,100)(mouse ,100)(dog,100)) 第三階段結算後:(cat ,200)(mouse ,200)(dog,100))
總結aggregate和aggreateByKey區別: aggregate在彙總階段會考慮預設值 aggreateByKey在彙總階段不會考慮預設值
9.mapPartitionsWithIndex
mapPartitionsWithIndex 可以檢視每個分割槽的內容1.首先自定義一個函式func 2.mapPartitionsWithIndex(func)
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> rdd1.mapPartitionsWithIndex(func1).collect
res0: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])
10.reduceByKey和groupByKey區別
groupByKey不在每個區內計算,直接去彙總
reduceByKey在每個區內計算,再去彙總
因此,在對大資料進行復雜計算時,reduceByKey優於groupByKey。
另外,如果僅僅是group處理,那麼以下函式應該優先於 groupByKey:
(1)、combineByKey 組合資料,但是組合之後的資料型別與輸入時值的型別不一樣。
(2)、foldByKey合併每一個 key 的所有值,在級聯函式和“零值”中使用。
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html