spark RDD常用運算元(二)
阿新 • • 發佈:2018-11-07
- reduceByKey
- 演算法解釋
reduceByKey 是比 combineByKey 更簡單的一種情況,只是兩個值合併成一個值,( Int, Int V)to (Int, Int C),比如疊加。所以 createCombiner reduceBykey 很簡單,就是直接返回 v,而 mergeValue和 mergeCombiners 邏輯是相同的,沒有區別。- 原始檔
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
- scala程式碼
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1))) // rdd.reduceByKey((x,y) => x + y).foreach(println(_)) println(rdd.reduceByKey((x,y) => x + y).collect().mkString(","))
- 過濾結果
(ee,1),(aa,2),(dd,1),(cc,1)
- foldByKey
- 演算法解釋
該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V,進行初始化V,再將對映函式應用於初始化後的V. foldByKey可以參考我之前的scala的fold的介紹與reduce不同的是 foldByKey開始摺疊的第一個元素不是集合中的第一個元素,而是傳入的一個元素- 原始檔
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
- scala程式碼
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1))) // rdd.reduceByKey((x,y) => x + y).foreach(println(_)) println(rdd.reduceByKey((x,y) => x + y).collect().mkString(","))
- 過濾結果
(ee,1),(aa,2),(dd,1),(cc,1)
- sortByKey
- 演算法解釋
SortByKey用於對pairRDD按照key進行排序,第一個引數可以設定true或者false,預設是true
SortBy和sortByKey功能相同- 原始檔
val rdd = sc.makeRDD(Array(("1", 1), ("5", 1), ("3", 1), ("1", 1), ("2", 1)))
- scala程式碼
println(rdd.reduceByKey((x, y) => x + y).sortByKey().collect().mkString(",")) println(rdd.reduceByKey((x, y) => x + y).sortBy(_._2).collect().mkString(","))
- 過濾結果
sortByKey : (1,2),(2,1),(3,1),(5,1) sortBy : (2,1),(5,1),(3,1),(1,2)
- groupByKey
- 演算法解釋
groupByKey會將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式, 有點類似於sql中的groupby,例如類似於mysql中的group_concat- 原始檔
val rdd = sc.makeRDD(Array(("1", 1), ("5", 1), ("3", 1), ("1", 1), ("2", 1)))
- scala程式碼
val scoreDetail = sc.parallelize(List(("name", "張三"), ("name", "李四"), ("age", 11), ("age", 20))) println(scoreDetail.groupByKey().collect().mkString(","))
- 過濾結果
(name,CompactBuffer(張三, 李四)),(age,CompactBuffer(11, 20))
- cogroup
- 演算法解釋
groupByKey是對單個 RDD 的資料進行分組,還可以使用一個叫作 cogroup() 的函式對多個共享同一個鍵的 RDD 進行分組- 原始檔
val scoreDetail = sc.parallelize(List(("xiaoming",95),("xiaoming",90),("lihua",95),("lihua",98),("xiaofeng",97))) val scoreDetai2 = sc.parallelize(List(("xiaoming",65),("lihua",63),("lihua",62),("xiaofeng",67))) val scoreDetai3 = sc.parallelize(List(("xiaoming",25),("xiaoming",15),("lihua",35),("lihua",28),("xiaofeng",36)))
- scala程式碼
val scoreDetail = sc.parallelize(List(("xiaoming",95),("xiaoming",90),("lihua",95),("lihua",98),("xiaofeng",97))) val scoreDetai2 = sc.parallelize(List(("xiaoming",65),("lihua",63),("lihua",62),("xiaofeng",67))) val scoreDetai3 = sc.parallelize(List(("xiaoming",25),("xiaoming",15),("lihua",35),("lihua",28),("xiaofeng",36))) println(scoreDetail.cogroup(scoreDetai2,scoreDetai3).collect().mkString(","))
- 過濾結果
(xiaoming,(CompactBuffer(95, 90),CompactBuffer(65),CompactBuffer(25, 15))),(lihua,(CompactBuffer(95, 98),CompactBuffer(63, 62),CompactBuffer(35, 28))),(xiaofeng,(CompactBuffer(97),CompactBuffer(67),CompactBuffer(36)))