1. 程式人生 > >spark rdd 轉換和動作

spark rdd 轉換和動作

2017-07-22

概述

本文對spark rdd的轉換和動作進行總結和實際操作演示. RDD(Resilient Distributed Datasets),彈性分散式資料集, 是spark分散式記憶體的一個抽象概念,RDD提供了一種高度受限的共享記憶體模型.即RDD是隻讀的記錄分割槽的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而建立,然而這些限制使得實現容錯的開銷很低。 rdd 的分散式,因為rdd支援分割槽, 自動把一個rdd根據partition分發到n臺spark裝置進行處理. 這些對使用者完全透明. 使用者感覺和操作本地資料一樣.

rdd通過parallelize和textFile或流來建立. 再通過轉換得到新的rdd. 轉換的過程不是立即執行, 而是在需要動作action時才開始. 這樣方便系統進行自動優化.

rdd操作示例

parallelize一般用於測示建立rdd.

scala> val square = sc.parallelize(List(1,2,3,4))
scala> val sq = square.map(x=>x*x).collect()
sq: Array[Int] = Array(1, 4, 9, 16)


scala> val drink1=sc.parallelize(List("coffee","tea","coffee","panda","monkey"))
scala> val rdd2 = sc.parallelize(List("coffee","money","kitty","貓"))

scala> val r11= rdd1.union(rdd2).collect()
r11: Array[String] = Array(coffee, tea, coffee, panda, monkey, coffee, money, kitty, 貓)

scala> val r12 = rdd1.intersection(rdd2).collect()
r12: Array[String] = Array(coffee)

scala> val r13 = rdd1.subtract(rdd2).collect()
r13: Array[String] = Array(tea, panda, monkey)


scala> val users=sc.parallelize(List("user1","user2"))
scala> val tags = sc.parallelize(List("經濟","政治","文化"))

scala> users.cartesian(tags).collect()
res1: Array[(String, String)] = Array((user1,經濟), (user1,政治), (user1,文化), (user2,經濟), (user2,政治), (user2,文化))


普通RDD操作

轉換 Transformation

轉換有如下一些種類

  • map(func)
  • filter(func)
  • repartition(numPartitions)
  • flatMap(func)
  • repartitionAndSortWithinPartitions(partitioner)
  • mapPartitions(func)
  • join(otherDataset, [numTasks])
  • mapPartitionsWithIndex(func)
  • cogroup(otherDataset, [numTasks])
  • sample(withReplacement, fraction, seed)
  • cartesian(otherDataset)
  • coalesce(numPartitions)

基礎轉換

測試用RDD 包含 {1, 2, 3, 3}

函式 目的 示例 結果
map 對RDD每個元素應用函式, 並返回一個RDD結果 rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap 對RDD每個元素應用函式, 並返回一個RDD結果,包含迭代器返回的內容.常用於抽取單詞. rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter 返回一個RDD結果, 由通過了filter的元素組成. rdd.filter(x => x != 1) {2, 3, 3}
distinct 移除重複元素 rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, [seed]) 對 RDD 抽樣,withReplacement是指是否有放回的抽樣為true為放回,為false為不放回,fraction為抽樣佔總資料量的比值 rdd.sample(false, 0.5) non-deterministic

兩個RDD轉換

RDD分別包含 {1, 2, 3} 和 {3, 4, 5}

函式 目的 示例 結果
union() 兩個RDD並集. rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() RDD 交集 rdd.intersection(other) {3}
subtract() 從一個RDD移除全部存在於另一個RDD的元素.如移除訓練資料. rdd.subtract(other) {1, 2}
cartesian() 兩個RDD的笛卡爾積, 一個RDD中每個元素和另一個RDD的每個元素兩兩組合 rdd.cartesian(other) {(1, 3), (1,4), … (3,5)}

普通RDD 動作(Action)

基礎動作

RDD 包含 {1, 2, 3, 3}

函式 目的 示例 結果
collect() 返回 RDD 全部元素 rdd.collect() {1, 2, 3, 3}
count() RDD 元素總計 rdd.count() 4
countByValue() RDD 每個元素出現次數總計 rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) 返回 RDD 中num個元素 rdd.take(2) {1, 2}
top(num) 返回 RDD 中top num個元素 rdd.top(2) {3, 3}
takeOrdered(num)(ordering) 返回 RDD 中num個元素, 基於給定排序 rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(withReplacement, num, [seed]) 返回 RDD 中隨機num個元素, withReplacement表示抽樣是否放回. 放回會有重複 rdd.takeSample(false, 1) non-deterministic
reduce(func) 併發將RDD中的元素聯合起來.如加和. rdd.fold((x, y) => x + y) 9
fold(zero)(func) 類reduce函式, 但提供初始值設定. rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) 類reduce函式, 但提供初始值設定. 可以返回不同型別 rdd.aggregate(0, 0) ({case (x, y) => (y._1() + x, y._2() + 1)}, {case (x, y) => (y._1() + x._1(), y._2() + x._2()) }) (9, 4)
foreach(func) 對RDD中每個元素應用函式. rdd.foreach(func)

reduce

對兩個元素應用reduce中的函式, 得到一個新的元素, 再用新元素和集合中的元素進行reduce運算. 如+, 求和,計數和其他聚合運算.

scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.reduce(_ + _)
res35: Int = 10

fold

fold()和reduce類似. 但會對每個分割槽置初始zero值(+為0,乘為1, 連線列表為空列表).

fold 定義:

def fold(zeroValue: T)(op: (T, T) => T): T
scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.fold(0)((x,y)=>x+y)
res40: Int = 10
scala> data.fold(1)((x,y)=>x+y)
res41: Int = 15

aggregate

聚合函式可以避免使用map. 和fold差不多, 需要傳入一個初始值函式, 再傳入一個每個分割槽的累積函式. 最後傳入一個分割槽之間的累積函式.

下面的程式碼是求均值. 引數(0,0) 表示總和0,計數0. 引數(acc, value) ,acc表示累計值, value是當前值.其中acc是元組. _1為第一個元素, 為累積值. _2為第二個元素,表示累計計數. 第三個函式(acc1, acc2) 則是各分割槽的歸總combine.

val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
scala> val result = data.aggregate((0,0))( (acc,value)=>(acc._1 + value, acc._2+1),(acc1,acc2)=>(acc1._1+acc2._1, acc1._2 + acc2._2))
result: (Int, Int) = (10,4)

scala> val avg = result._1/result._2
avg: Int = 2

# pairRDD 操作

pairRDD是特殊的RDD, 包含key->value元組鍵值對的RDD, 有特殊操作.

很多變換會返回pairRDD, 也可以將普通RDD轉為pairRDD. 如通過map()函式, 將行的第一個單詞作為key,行作為value.

 val pairs = lines.map(x => (x.split(" ")(0), x))

## 單個pairRDD上的變換 rdd={(1, 2), (3, 4), (3, 6)}

函式 目的 示例 結果
reduceByKey(func) 用同一個key將值聯合. rdd.reduceByKey((x,y)=>x+y) {(1,2),(3,10)}
groupByKey() 用同一個key將值分組 rdd.groupByKey() {(1,[2]),(3,[4,6])}
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) 用同一個key將值聯合, 但返回不同型別
mapValues(func) 將函式應用於每個值, 不改變key. rdd.mapValues(x=>x+1) {(1,3),(3,5),(3,7)}
flatMapValues(func) 將函式應用於一個返回pair RDD每個值的迭代器, 對每個返回的的值, 用原來的key生成鍵值對.常用於 tokenization. rdd.flatMapValues(x=>(x to 5) {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}
keys() 返回 RDD 的鍵集合 rdd.keys() {1,3,3}
values() 返回 RDD 的值集合 rdd.values() {2,4,6}
sortByKey() 返回 RDD, 用鍵排序. rdd.sortByKey() {(1,2),(3,4),(3,6)}

combineByKey

對每個key求均值 input ={(1, 2), (3, 4), (3, 6)} combineByKey 可以省去map的過程.


 scala> val input=sc.parallelize(List((1,2),(3,4),(3,6))

scala> val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

scala> result.collectAsMap().map(println(_))
(1,2.0)
(3,5.0)

說明

  • 第一個引數相當於map. (v) => (v, 1), 表示將key都轉為計數為1的元組

  • 第二個引數 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1) 表示對每一個分割槽的acc元組的第一個值求和, 第二個計數

  • 第三個引數 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) 表示對每個分割槽的累積元組進行總的加和和計數.
  • map操作進行求均值

兩個 pair RDD的變換

(rdd = {(1,2),(3,4),(3,6)}, other = {(3,9)})

函式 目的 示例 結果
subtractByKey 將另一個RDD存在的key從本RDD移除. rdd.subtractByKey(other) {(1,2)}
join 兩個 RDD 進行 inner join . rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 兩個 RDD 進行右連線, key必須在右邊的RDD存在 rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))}
leftOuterJoin 兩個 RDD 進行左連線, key必須在左邊的RDD存在 rdd.leftOuterJoin(other) {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))}
cogroup 兩個RDD具有同一個鍵, 則組合成一個組. rdd.cogroup(other) {(1,([2],[])),(3,([4,6],[9]))}

參考

《Learning spark》

如非註明轉載, 均為原創. 本站遵循知識共享CC協議,轉載請註明來源