spark rdd 轉換和動作
概述
本文對spark rdd的轉換和動作進行總結和實際操作演示. RDD(Resilient Distributed Datasets),彈性分散式資料集, 是spark分散式記憶體的一個抽象概念,RDD提供了一種高度受限的共享記憶體模型.即RDD是隻讀的記錄分割槽的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而建立,然而這些限制使得實現容錯的開銷很低。 rdd 的分散式,因為rdd支援分割槽, 自動把一個rdd根據partition分發到n臺spark裝置進行處理. 這些對使用者完全透明. 使用者感覺和操作本地資料一樣.
rdd通過parallelize和textFile或流來建立. 再通過轉換得到新的rdd. 轉換的過程不是立即執行, 而是在需要動作action時才開始. 這樣方便系統進行自動優化.
rdd操作示例
parallelize一般用於測示建立rdd.
scala> val square = sc.parallelize(List(1,2,3,4)) scala> val sq = square.map(x=>x*x).collect() sq: Array[Int] = Array(1, 4, 9, 16) scala> val drink1=sc.parallelize(List("coffee","tea","coffee","panda","monkey")) scala> val rdd2 = sc.parallelize(List("coffee","money","kitty","貓")) scala> val r11= rdd1.union(rdd2).collect() r11: Array[String] = Array(coffee, tea, coffee, panda, monkey, coffee, money, kitty, 貓) scala> val r12 = rdd1.intersection(rdd2).collect() r12: Array[String] = Array(coffee) scala> val r13 = rdd1.subtract(rdd2).collect() r13: Array[String] = Array(tea, panda, monkey) scala> val users=sc.parallelize(List("user1","user2")) scala> val tags = sc.parallelize(List("經濟","政治","文化")) scala> users.cartesian(tags).collect() res1: Array[(String, String)] = Array((user1,經濟), (user1,政治), (user1,文化), (user2,經濟), (user2,政治), (user2,文化))
普通RDD操作
轉換 Transformation
轉換有如下一些種類
- map(func)
- filter(func)
- repartition(numPartitions)
- flatMap(func)
- repartitionAndSortWithinPartitions(partitioner)
- mapPartitions(func)
- join(otherDataset, [numTasks])
- mapPartitionsWithIndex(func)
- cogroup(otherDataset, [numTasks])
- sample(withReplacement, fraction, seed)
- cartesian(otherDataset)
- coalesce(numPartitions)
基礎轉換
測試用RDD 包含 {1, 2, 3, 3}
函式 | 目的 | 示例 | 結果 |
---|---|---|---|
map | 對RDD每個元素應用函式, 並返回一個RDD結果 | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap | 對RDD每個元素應用函式, 並返回一個RDD結果,包含迭代器返回的內容.常用於抽取單詞. | rdd.flatMap(x => x.to(3)) | {1, 2, 3, 2, 3, 3, 3} |
filter | 返回一個RDD結果, 由通過了filter的元素組成. | rdd.filter(x => x != 1) | {2, 3, 3} |
distinct | 移除重複元素 | rdd.distinct() | {1, 2, 3} |
sample(withReplacement, fraction, [seed]) | 對 RDD 抽樣,withReplacement是指是否有放回的抽樣為true為放回,為false為不放回,fraction為抽樣佔總資料量的比值 | rdd.sample(false, 0.5) | non-deterministic |
兩個RDD轉換
RDD分別包含 {1, 2, 3} 和 {3, 4, 5}
函式 | 目的 | 示例 | 結果 |
---|---|---|---|
union() | 兩個RDD並集. | rdd.union(other) | {1, 2, 3, 3, 4, 5} |
intersection() | RDD 交集 | rdd.intersection(other) {3} | |
subtract() | 從一個RDD移除全部存在於另一個RDD的元素.如移除訓練資料. | rdd.subtract(other) {1, 2} | |
cartesian() | 兩個RDD的笛卡爾積, 一個RDD中每個元素和另一個RDD的每個元素兩兩組合 | rdd.cartesian(other) | {(1, 3), (1,4), … (3,5)} |
普通RDD 動作(Action)
基礎動作
RDD 包含 {1, 2, 3, 3}
函式 | 目的 | 示例 | 結果 |
---|---|---|---|
collect() | 返回 RDD 全部元素 | rdd.collect() | {1, 2, 3, 3} |
count() | RDD 元素總計 | rdd.count() | 4 |
countByValue() | RDD 每個元素出現次數總計 | rdd.countByValue() | {(1, 1), (2, 1), (3, 2)} |
take(num) | 返回 RDD 中num個元素 | rdd.take(2) | {1, 2} |
top(num) | 返回 RDD 中top num個元素 | rdd.top(2) | {3, 3} |
takeOrdered(num)(ordering) | 返回 RDD 中num個元素, 基於給定排序 | rdd.takeOrdered(2)(myOrdering) | {3, 3} |
takeSample(withReplacement, num, [seed]) | 返回 RDD 中隨機num個元素, withReplacement表示抽樣是否放回. 放回會有重複 | rdd.takeSample(false, 1) | non-deterministic |
reduce(func) | 併發將RDD中的元素聯合起來.如加和. | rdd.fold((x, y) => x + y) | 9 |
fold(zero)(func) | 類reduce函式, 但提供初始值設定. | rdd.fold(0)((x, y) => x + y) | 9 |
aggregate(zeroValue)(seqOp, combOp) | 類reduce函式, 但提供初始值設定. 可以返回不同型別 | rdd.aggregate(0, 0) ({case (x, y) => (y._1() + x, y._2() + 1)}, {case (x, y) => (y._1() + x._1(), y._2() + x._2()) }) | (9, 4) |
foreach(func) | 對RDD中每個元素應用函式. | rdd.foreach(func) | 無 |
reduce
對兩個元素應用reduce中的函式, 得到一個新的元素, 再用新元素和集合中的元素進行reduce運算. 如+, 求和,計數和其他聚合運算.
scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.reduce(_ + _)
res35: Int = 10
fold
fold()和reduce類似. 但會對每個分割槽置初始zero值(+為0,乘為1, 連線列表為空列表).
fold 定義:
def fold(zeroValue: T)(op: (T, T) => T): T
scala> val data = sc.parallelize(List(1,2,3,4))
scala> data.collect()
res33: Array[Int] = Array(1, 2, 3, 4)
scala> data.fold(0)((x,y)=>x+y)
res40: Int = 10
scala> data.fold(1)((x,y)=>x+y)
res41: Int = 15
aggregate
聚合函式可以避免使用map. 和fold差不多, 需要傳入一個初始值函式, 再傳入一個每個分割槽的累積函式. 最後傳入一個分割槽之間的累積函式.
下面的程式碼是求均值. 引數(0,0) 表示總和0,計數0. 引數(acc, value) ,acc表示累計值, value是當前值.其中acc是元組. _1為第一個元素, 為累積值. _2為第二個元素,表示累計計數. 第三個函式(acc1, acc2) 則是各分割槽的歸總combine.
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
scala> val result = data.aggregate((0,0))( (acc,value)=>(acc._1 + value, acc._2+1),(acc1,acc2)=>(acc1._1+acc2._1, acc1._2 + acc2._2))
result: (Int, Int) = (10,4)
scala> val avg = result._1/result._2
avg: Int = 2
# pairRDD 操作
pairRDD是特殊的RDD, 包含key->value元組鍵值對的RDD, 有特殊操作.
很多變換會返回pairRDD, 也可以將普通RDD轉為pairRDD. 如通過map()函式, 將行的第一個單詞作為key,行作為value.
val pairs = lines.map(x => (x.split(" ")(0), x))
## 單個pairRDD上的變換 rdd={(1, 2), (3, 4), (3, 6)}
函式 | 目的 | 示例 | 結果 |
---|---|---|---|
reduceByKey(func) | 用同一個key將值聯合. | rdd.reduceByKey((x,y)=>x+y) | {(1,2),(3,10)} |
groupByKey() | 用同一個key將值分組 | rdd.groupByKey() | {(1,[2]),(3,[4,6])} |
combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) | 用同一個key將值聯合, 但返回不同型別 | ||
mapValues(func) | 將函式應用於每個值, 不改變key. | rdd.mapValues(x=>x+1) | {(1,3),(3,5),(3,7)} |
flatMapValues(func) | 將函式應用於一個返回pair RDD每個值的迭代器, 對每個返回的的值, 用原來的key生成鍵值對.常用於 tokenization. | rdd.flatMapValues(x=>(x to 5) | {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} |
keys() | 返回 RDD 的鍵集合 | rdd.keys() | {1,3,3} |
values() | 返回 RDD 的值集合 | rdd.values() | {2,4,6} |
sortByKey() | 返回 RDD, 用鍵排序. | rdd.sortByKey() | {(1,2),(3,4),(3,6)} |
combineByKey
對每個key求均值 input ={(1, 2), (3, 4), (3, 6)} combineByKey 可以省去map的過程.
scala> val input=sc.parallelize(List((1,2),(3,4),(3,6))
scala> val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
scala> result.collectAsMap().map(println(_))
(1,2.0)
(3,5.0)
說明
-
第一個引數相當於map. (v) => (v, 1), 表示將key都轉為計數為1的元組
-
第二個引數 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1) 表示對每一個分割槽的acc元組的第一個值求和, 第二個計數
- 第三個引數 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) 表示對每個分割槽的累積元組進行總的加和和計數.
- map操作進行求均值
兩個 pair RDD的變換
(rdd = {(1,2),(3,4),(3,6)}, other = {(3,9)})
函式 | 目的 | 示例 | 結果 |
---|---|---|---|
subtractByKey | 將另一個RDD存在的key從本RDD移除. | rdd.subtractByKey(other) | {(1,2)} |
join | 兩個 RDD 進行 inner join . | rdd.join(other) | {(3,(4,9)),(3,(6,9))} |
rightOuterJoin | 兩個 RDD 進行右連線, key必須在右邊的RDD存在 | rdd.rightOuterJoin(other) | {(3,(Some(4),9)),(3,(Some(6),9))} |
leftOuterJoin | 兩個 RDD 進行左連線, key必須在左邊的RDD存在 | rdd.leftOuterJoin(other) | {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))} |
cogroup | 兩個RDD具有同一個鍵, 則組合成一個組. | rdd.cogroup(other) | {(1,([2],[])),(3,([4,6],[9]))} |
參考
《Learning spark》
如非註明轉載, 均為原創. 本站遵循知識共享CC協議,轉載請註明來源