1. 程式人生 > >spark RDD常用運算元(二)

spark RDD常用運算元(二)

- reduceByKey

  • 演算法解釋
    reduceByKey 是比 combineByKey 更簡單的一種情況,只是兩個值合併成一個值,( Int, Int V)to (Int, Int C),比如疊加。所以 createCombiner reduceBykey 很簡單,就是直接返回 v,而 mergeValue和 mergeCombiners 邏輯是相同的,沒有區別。
  • 原始檔
    val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
    
  • scala程式碼
    val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
    // rdd.reduceByKey((x,y) => x + y).foreach(println(_))
    println(rdd.reduceByKey((x,y) => x + y).collect().mkString(","))
    
  • 過濾結果
    (ee,1),(aa,2),(dd,1),(cc,1)
    

- foldByKey

  • 演算法解釋

    該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V,進行初始化V,再將對映函式應用於初始化後的V. foldByKey可以參考我之前的scala的fold的介紹與reduce不同的是 foldByKey開始摺疊的第一個元素不是集合中的第一個元素,而是傳入的一個元素
  • 原始檔
    val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
    
  • scala程式碼
    val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
    // rdd.reduceByKey((x,y) => x + y).foreach(println(_))
    println(rdd.reduceByKey((x,y) => x + y).collect().mkString(","))
    
  • 過濾結果
    (ee,1),(aa,2),(dd,1),(cc,1)
    

- sortByKey

  • 演算法解釋

    SortByKey用於對pairRDD按照key進行排序,第一個引數可以設定true或者false,預設是true
    SortBy和sortByKey功能相同
  • 原始檔
    val rdd = sc.makeRDD(Array(("1", 1), ("5", 1), ("3", 1), ("1", 1), ("2", 1)))
    
  • scala程式碼
    println(rdd.reduceByKey((x, y) => x + y).sortByKey().collect().mkString(","))
    println(rdd.reduceByKey((x, y) => x + y).sortBy(_._2).collect().mkString(","))
    
  • 過濾結果
    sortByKey : (1,2),(2,1),(3,1),(5,1)
    sortBy : (2,1),(5,1),(3,1),(1,2)
    

- groupByKey

  • 演算法解釋
    groupByKey會將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式, 有點類似於sql中的groupby,例如類似於mysql中的group_concat
  • 原始檔
    val rdd = sc.makeRDD(Array(("1", 1), ("5", 1), ("3", 1), ("1", 1), ("2", 1)))
    
  • scala程式碼
    val scoreDetail = sc.parallelize(List(("name", "張三"), ("name", "李四"), ("age", 11), ("age", 20)))
    println(scoreDetail.groupByKey().collect().mkString(","))
    
  • 過濾結果
    (name,CompactBuffer(張三, 李四)),(age,CompactBuffer(11, 20))
    

- cogroup

  • 演算法解釋
    groupByKey是對單個 RDD 的資料進行分組,還可以使用一個叫作 cogroup() 的函式對多個共享同一個鍵的 RDD 進行分組
  • 原始檔
    val scoreDetail = sc.parallelize(List(("xiaoming",95),("xiaoming",90),("lihua",95),("lihua",98),("xiaofeng",97)))
    val scoreDetai2 = sc.parallelize(List(("xiaoming",65),("lihua",63),("lihua",62),("xiaofeng",67)))
    val scoreDetai3 = sc.parallelize(List(("xiaoming",25),("xiaoming",15),("lihua",35),("lihua",28),("xiaofeng",36)))
    
  • scala程式碼
    val scoreDetail = sc.parallelize(List(("xiaoming",95),("xiaoming",90),("lihua",95),("lihua",98),("xiaofeng",97)))
    val scoreDetai2 = sc.parallelize(List(("xiaoming",65),("lihua",63),("lihua",62),("xiaofeng",67)))
    val scoreDetai3 = sc.parallelize(List(("xiaoming",25),("xiaoming",15),("lihua",35),("lihua",28),("xiaofeng",36)))
    println(scoreDetail.cogroup(scoreDetai2,scoreDetai3).collect().mkString(","))
    
  • 過濾結果
    (xiaoming,(CompactBuffer(95, 90),CompactBuffer(65),CompactBuffer(25, 15))),(lihua,(CompactBuffer(95, 98),CompactBuffer(63, 62),CompactBuffer(35, 28))),(xiaofeng,(CompactBuffer(97),CompactBuffer(67),CompactBuffer(36)))