Spark-RDD運算元
一、Spark-RDD運算元簡介
RDD(Resilient Distributed DataSet)是分散式資料集。RDD是Spark最基本的資料的抽象。 scala中的集合。RDD相當於一個不可變、可分割槽、裡面的元素可以平行計算的集合。 RDD特點: 具有資料流模型的特點 自動容錯 位置感知排程 可伸縮性 RDD允許使用者在執行多個查詢時將工作集快取在記憶體中,可以重用工作集,大大的提升了查詢速度。 RDD型別分為: 1)Transformation 轉換 2)Action 動作
二、RDD建立
RDD分為兩種型別: 1)Transformation(lazy-》懶載入) 2)Action(觸發任務)
例子:
scala> sc.textFile("/root/words.txt") res8: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[28] at textFile at <console>:25 scala> sc.textFile("/root/words.txt").flatMap(_.split(" ")) res9: org.apache.spark.rdd.RDD[String]= MapPartitionsRDD[31] at flatMap at <console>:25 scala> res8 res10: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[28] at textFile at <console>:25 scala> res8.flatMap(_.split(" ")) res11: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at flatMap at <console>:27 scala> res8.flatMap(_.split(" ")).map((_,1)) res12: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at map at <console>:27 scala> res8.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) res13: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[37] at reduceByKey at <console>:27 scala> res13.collect res14: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1)) scala> val rdd1 = sc.textFile("/root/words.txt") rdd1: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[39] at textFile at <console>:24 scala> rdd1.count res15: Long = 3 scala> val list = List(1,3,5,7) list: List[Int] = List(1, 3, 5, 7) scala> list.map(_ * 100) res16: List[Int] = List(100, 300, 500, 700) scala> val rdd1 = sc.parallelize(list) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at parallelize at <console>:26 scala> rdd1.map(_ * 100) res17: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at map at <console>:29 scala> res17.collect res18: Array[Int] = Array(100, 300, 500, 700) scala> val rdd2 = sc.makeRDD(list) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at makeRDD at <console>:26 scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:24 scala> val rdd2 = rdd1.map(_ * 1000) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[44] at map at <console>:26 scala> rdd2.collect res19: Array[Int] = Array(1000, 2000, 3000, 4000, 5000)
三、常用Transformation
Transformation特點: 1)生成新的RDD 2)lazy懶載入 等待處理 3)並不會儲存真正的資料,記錄了轉換關係
1、map(func)
2、flatMap(func)
3、sortby
4、reduceByKey
scala> sc.textFile("/root/words.txt") res8: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[28] at textFile at <console>:25 scala> res8.flatMap(_.split(" ")) res11: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at flatMap at <console>:27 scala> res8.flatMap(_.split(" ")).map((_,1)) res12: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at map at <console>:27 scala> res8.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) res13: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[37] at reduceByKey at <console>:27
5、filter 過濾
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24 scala> rdd1.filter(_ % 2 == 0) res24: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[49] at filter at <console>:27 scala> res24.collect res25: Array[Int] = Array(2, 4)
6、union 並集
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6,7)) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[50] at parallelize at <console>:24 scala> rdd1.union(rdd2) res26: org.apache.spark.rdd.RDD[Int] = UnionRDD[51] at union at <console>:29 scala> res26.collect res27: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 7) scala> rdd1 union rdd2 res28: org.apache.spark.rdd.RDD[Int] = UnionRDD[52] at union at <console>:29 scala> res28.collect res29: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 7)
7、groupByKey 分組
scala> val rdd3 = sc.parallelize(List(("Tom",18),("John",16),("Tom",20),("Mary",17),("John",23))) rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[53] at parallelize at <console>:24 scala> rdd3.groupByKey res30: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[54] at groupByKey at <console>:27 scala> res30.collect res31: Array[(String, Iterable[Int])] = Array((John,CompactBuffer(16, 23)), (Tom,CompactBuffer(18, 20)), (Mary,CompactBuffer(17)))
8、intersection 交集
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6,7)) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24 scala> rdd1.intersection(rdd2) res33: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[62] at intersection at <console>:29 scala> res33.collect res34: Array[Int] = Array(3, 4, 1, 5, 2)
9、join 關聯
scala> val rdd1 = sc.parallelize(List(("Tom",18),("John",16),("Mary",21))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[65] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(List(("Tom",28),("John",26),("Cat",17))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[66] at parallelize at <console>:24 scala> rdd1.join(rdd2) res39: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[69] at join at <console>:29 scala> res39.collect res40: Array[(String, (Int, Int))] = Array((John,(16,26)), (Tom,(18,28)))
10、leftOuterJoin 左連線
保留左側RDD,右側如果join上保留,沒join上None
scala> val rdd1 = sc.parallelize(List(("Tom",18),("John",16),("Mary",21))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[65] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(List(("Tom",28),("John",26),("Cat",17))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[66] at parallelize at <console>:24 scala> rdd1.leftOuterJoin(rdd2) res41: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[72] at leftOuterJoin at <console>:29 scala> res41.collect res42: Array[(String, (Int, Option[Int]))] = Array((John,(16,Some(26))), (Tom,(18,Some(28))), (Mary,(21,None)))
11、rightOuterJoin 右連線
scala> val rdd1 = sc.parallelize(List(("Tom",18),("John",16),("Mary",21))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[65] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(List(("Tom",28),("John",26),("Cat",17))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[66] at parallelize at <console>:24 scala> rdd1.rightOuterJoin(rdd2).collect res43: Array[(String, (Option[Int], Int))] = Array((John,(Some(16),26)), (Tom,(Some(18),28)), (Cat,(None,17)))
12、cartesian 笛卡爾積
scala> val rdd1 = sc.parallelize(List("Tom","Mary")) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[76] at parallelize at <console>:24 scala> val rdd2 = sc.parallelize(List("John","Joe")) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[77] at parallelize at <console>:24 scala> rdd1.cartesian(rdd2) res45: org.apache.spark.rdd.RDD[(String, String)] = CartesianRDD[78] at cartesian at <console>:29 scala> res45.collect res46: Array[(String, String)] = Array((Tom,John), (Tom,Joe), (Mary,John), (Mary,Joe))
四、常用的Action
1、collect 收集
scala> val rdd1 = sc.parallelize(List(1,2,3,4)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd1.collect res0: Array[Int] = Array(1, 2, 3, 4)
2、saveAsTextFile(path) 儲存檔案
三份資料:5B 5B 600B
理想切分:5+5+600=610 610/3 = 203
5B一片
5B一片
203一片
203一片
203一片
1一片
scala> val rdd1 = sc.parallelize(List(1,2,3,4)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd1.saveAsTextFile("/root/RDD1") //檢視分割槽數 scala> rdd1.partitions.length res3: Int = 4
3、count 計數
scala> val rdd1 = sc.parallelize(List(1,2,3,4)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd1.count res2: Long = 4
4、reduce 聚合
scala> val rdd2 = sc.parallelize(List(1,2,3,4),2) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24 scala> rdd2.partitions.length res4: Int = 2 scala> rdd2.reduce(_+_) res5: Int = 10
5、countByKey() 根據key計數
scala> sc.parallelize(List(("Tom",18),("Tom",28),("John",14),("Mary",16))) res9: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:25 scala> res9.count res10: Long = 4 scala> res9.countByKey() res11: scala.collection.Map[String,Long] = Map(Tom -> 2, Mary -> 1, John -> 1) scala> res9.reduceByKey(_+_) res12: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:27 scala> res9.reduceByKey(_+_).collect res13: Array[(String, Int)] = Array((Tom,46), (Mary,16), (John,14))
6、take(n) 取出多少個元素
scala> val rdd1 = sc.parallelize(List(1,2,3,4)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd1.take(2) res15: Array[Int] = Array(1, 2) scala> val rdd3 = sc.parallelize(List(3,2,8,1,7)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24 scala> rdd3.take(2) res17: Array[Int] = Array(3, 2)
7、first 返回RDD的第一個元素
scala> val rdd3 = sc.parallelize(List(3,2,8,1,7)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24 scala> rdd3.first res18: Int = 3
8、takeOrdered(n) 取出多少個元素 預設正序
scala> val rdd3 = sc.parallelize(List(3,2,8,1,7)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24 scala> rdd3.takeOrdered(2) res19: Array[Int] = Array(1, 2)
9、top(n) 倒序排序 取出元素
scala> val rdd3 = sc.parallelize(List(3,2,8,1,7)) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24 scala> rdd3.top(2) res20: Array[Int] = Array(8, 7)
五、spark高階運算元
1、mapPartitionsWithIndex(func)
設定分割槽,並且檢視每個分割槽中存放的元素
檢視每個分割槽中元素
需要傳遞函式作為引數
val func = (index:Int,iter:Iterator[(Int)]) => {iter.toList.map(x => "partID:" + index + "," + "datas:" + x + "]").iterator}
scala> val rdd3 = sc.parallelize(List(1,2,3,4,5,6,7),2) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> val func = (index:Int,iter:Iterator[(Int)]) => {iter.toList.map(x => "partID:" + index + "," + "datas:" + x + "]").iterator} func: (Int, Iterator[Int]) => Iterator[String] = <function2> scala> rdd3.mapPartitionsWithIndex(func) res21: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at mapPartitionsWithIndex at <console>:29 scala> rdd3.mapPartitionsWithIndex(func).collect res22: Array[String] = Array(partID:0,datas:1], partID:0,datas:2], partID:0,datas:3], partID:1,datas:4], partID:1,datas:5], partID:1,datas:6], partID:1,datas:7])
2、aggregate
聚合,先區域性後全域性
max 取最大值
min 取最小值
scala> val rdd3 = sc.parallelize(List(1,2,3,4,5,6,7),2) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> rdd3.aggregate(0)(_+_,_+_) res23: Int = 28 scala> rdd3.max res24: Int = 7 scala> rdd3.min res25: Int = 1 scala> val rdd3 = sc.parallelize(List(1,2,3,4,5,6,7),2) rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24 scala> rdd3.aggregate(0)(math.max(_,_),_+_) res29: Int = 10 scala> rdd3.aggregate(10)(math.max(_,_),_+_) res31: Int = 30 // 1+2+3+20 + 4+5+6+7+20 + 20 = 88 scala> rdd3.aggregate(20)(_+_,_+_) res32: Int = 88 scala> val rdd4 = sc.parallelize(List("a","b","c","d","e"),2) rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24 scala> rdd4.aggregate("|")(_+_,_+_) res33: String = ||ab|cde scala> rdd4.aggregate("|")(_+_,_+_) res34: String = ||cde|ab scala> val rdd5 = sc.parallelize(List("12","23","234","3456"),2) rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24 scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y) res35: String = 24 scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y) res36: String = 42 scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y) res37: String = 24 scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y) res38: String = 42 scala> val rdd6 = sc.parallelize(List("12","23","345",""),2) rdd6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:24 scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y) res41: String = 01 scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y) res42: String = 10 scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y) res43: String = 01 scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y) res48: String = 10 scala> val rdd7 = sc.parallelize(List("12","23","","456"),2) rdd7: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd7.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y) res1: String = 11 scala> ("").length res2: Int = 0 scala> 0.length <console>:24: error: value length is not a member of Int 0.length ^ scala> 0.toString.length res5: Int = 1 scala> rdd7.aggregate("0")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y) res6: String = 011 scala> rdd7.aggregate("0")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y) res7: String = 011
3、aggregateByKey
根據key聚合,先區域性再全域性
scala> val rdd8 = sc.parallelize(List(("cat",3),("cat",8),("mouse",6),("dog",8))) rdd8: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24 scala> def func(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {iter.toList.map(x => "partID:" + index + "," + "values:" + x + "]").iterator} func: (index: Int, iter: Iterator[(String, Int)])Iterator[String] scala> rdd8.mapPartitionsWithIndex(func).collect res34: Array[String] = Array(partID:0,values:(cat,3)], partID:1,values:(cat,8)], partID:2,values:(mouse,6)], partID:3,values:(dog,8)]) scala> rdd8.aggregateByKey(0)(_+_,_+_).collect res35: Array[(String, Int)] = Array((dog,8), (mouse,6), (cat,11))
4、combineByKey
aggregateByKey和reduceByKey底層呼叫都是combineByKey
最底層的方法,先區域性累加,再全域性累加
scala> val rdd1 = sc.textFile("hdfs://192.168.146.111:9000/words.txt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).collect rdd1: Array[(String, Int)] = Array((haha,1), (heihei,1), (hello,3), (Beijing,1), (world,1), (China,1)) scala> val rdd2 = sc.textFile("hdfs://192.168.146.111:9000/words.txt").flatMap(_.split("\t")).map((_,1)).combineByKey(x => x,(m:Int,n:Int) => (m+n),(a:Int,b:Int) => (a+b)).collect rdd2: Array[(String, Int)] = Array((haha,1), (heihei,1), (hello,3), (Beijing,1), (world,1), (China,1))
5、coalesce
coalesce(4,true)
分割槽數4
是否shuffle
repartition的實現,已預設加了shuffle
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24 scala> rdd2.partitions.length res42: Int = 2 scala> val rdd3 = rdd2.coalesce(4,true) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at coalesce at <console>:26 scala> rdd3.partitions.length res43: Int = 4 scala> val rdd4 = rdd3.repartition(5) rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at repartition at <console>:28 scala> rdd4.partitions.length res44: Int = 5
6、filterByRange
過濾出指定範圍的元素
scala> val rdd6 = sc.parallelize(List(("a",3),("b",2),("d",5),("e",8))) rdd6: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:24 scala> rdd6.filterByRange("b","d").collect res45: Array[(String, Int)] = Array((b,2), (d,5)) scala> rdd6.filterByRange("b","e").collect res46: Array[(String, Int)] = Array((b,2), (d,5), (e,8))
7、flatMapValues
切分出每個元素
scala> val rdd7 = sc.parallelize(List(("a","3 6"),("b","2 5"),("d","5 8"))) rdd7: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[27] at parallelize at <console>:24 scala> rdd7.flatMapValues(_.split(" ")).collect res47: Array[(String, String)] = Array((a,3), (a,6), (b,2), (b,5), (d,5), (d,8))
8、foldByKey
需求:根據key來拼接字串
scala> val rdd8 = sc.parallelize(List("Tom","John","Mary","Joe"),2) rdd8: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[31] at parallelize at <console>:24 scala> val rdd9 = rdd8.map(x => (x.length,x)) rdd9: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[32] at map at <console>:26 scala> rdd9.collect res48: Array[(Int, String)] = Array((3,Tom), (4,John), (4,Mary), (3,Joe)) scala> rdd9.foldByKey("")(_+_).collect res49: Array[(Int, String)] = Array((4,JohnMary), (3,JoeTom))
9、foreach
遍歷元素
import org.apache.spark.{SparkConf, SparkContext} object ForeachDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ForeachDemo").setMaster("local[2]") val sc = new SparkContext(conf) //建立rdd val rdd1 = sc.parallelize(List(1,2,3,4,5),3) rdd1.foreach(println(_)) sc.stop() } }
結果:
10、keyBy
以什麼為key
keys values
拿到key 拿到value
scala> val rdd2 = sc.parallelize(List("Tom","John","Jack"),3) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24 scala> val rdd3 = rdd2.keyBy(_.length) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[38] at keyBy at <console>:26 scala> rdd3.collect res60: Array[(Int, String)] = Array((3,Tom), (4,John), (4,Jack)) scala> rdd3.keys.collect res61: Array[Int] = Array(3, 4, 4) scala> rdd3.values.collect res62: Array[String] = Array(Tom, John, Jack)
六、RDD並行化流程