spark入門三(RDD基本運算)
阿新 • • 發佈:2018-12-15
1. RDD基本操作
val rdd1 = sc.parallelize(List(1,2,3,4,4))
輸出結果:rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//這裡org.apache.spark.rdd.RDD[Int],這裡RDD[Int] 表示RDD裡面存放是Int型別
//將rdd轉換成Array
scala> rdd1.collect
res0: Array[Int] = Array(1, 2, 3, 4, 4)
//建立stringRDD
scala> val rdd2 = sc.parallelize(List("apple","orange","banana","Grape"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
//將stringRDD轉換成Array
scala> rdd2.collect
res1: Array[String] = Array(apple, orange, banana, Grape)
//map運算:map運算可以傳入的函式,將每一個元素經過函式運算產生另外一個RDD
scala> rdd1.map(_ + 1).collect
res2: Array[Int] = Array(2, 3, 4, 5, 5)
scala> rdd2.map(x => "fruit:"+x).collect
res5: Array[String] = Array(fruit:apple, fruit:orange, fruit:banana, fruit:Grape)
//filter操作:可以用於對RDD內每個元素進行篩選,並且產生新的RDD
scala> rdd1.filter(_ < 3).collect
res6: Array[Int] = Array(1, 2)
scala> rdd1.filter(x => x < 3).collect
//filter對字串操作
scala> rdd2.filter(x => x.contains("ra")).collect
res8: Array[String] = Array(orange, Grape)
//distinct運算:刪除重複元素
scala> rdd1.distinct.collect
res9: Array[Int] = Array(4, 2, 1, 3)
//randomSplit運算:可以將整個集合元素,以隨機數的方式按照比例分為多個RDD
scala> val sRDD = rdd1.randomSplit(Array(0.4,0.6))
sRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[12] at randomSplit at <console>:25, MapPartitionsRDD[13] at randomSplit at <console>:25)
scala> sRDD(0).collect
res3: Array[Int] = Array(1, 2, 4, 4)
scala> sRDD(1).collect
res4: Array[Int] = Array(3)
//groupBy運算:按照輸入的匿名引數規則,將資料分為多個Array
scala> val gRDD = rdd1.groupBy(x => if(x%2==0) "even" else "odd").collect
gRDD: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 4)), (odd,CompactBuffer(3, 1)))
//輸出結果
scala> gRDD(0)
res5: (String, Iterable[Int]) = (even,CompactBuffer(2, 4, 4))
scala> gRDD(1)
res6: (String, Iterable[Int]) = (odd,CompactBuffer(3, 1))
2. 多個RDD轉換操作,RDD支援執行多個RDD的運算
scala> val rdd1 = sc.parallelize(List(3,1,2,5,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(5,6))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val rdd3 = sc.parallelize(List(2,7))
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
//使用union進行並集運算
scala> rdd1.union(rdd2).union(rdd3).collect
res7: Array[Int] = Array(3, 1, 2, 5, 5, 5, 6, 2, 7)
//使用++進行並集運算
scala> (rdd1 ++ rdd2 ++ rdd3).collect
res8: Array[Int] = Array(3, 1, 2, 5, 5, 5, 6, 2, 7)
//使用intersection進行交集運算
scala> rdd1.intersection(rdd2).collect
res9: Array[Int] = Array(5)
//使用subtract進行差集運算
scala> rdd1.subtract(rdd2).collect
res10: Array[Int] = Array(2, 1, 3)
//使用cartesian進行笛卡爾集的運算
scala> rdd1.cartesian(rdd2).collect
res11: Array[(Int, Int)] = Array((3,5), (1,5), (3,6), (1,6), (2,5), (5,5), (5,5), (2,6), (5,6), (5,6))
3. 基本動作運算(這都是Actions運算,會馬上執行結果)
//讀取第1條資料
scala> rdd1.first
res12: Int = 3
//讀取前幾條資料
scala> rdd1.take(2)
res13: Array[Int] = Array(3, 1)
//按照從大到小排序讀取前幾條資料
scala> rdd1.takeOrdered(4)(Ordering[Int].reverse)
res15: Array[Int] = Array(5, 5, 3, 2)
//統計功能
scala> rdd1.stats
res16: org.apache.spark.util.StatCounter = (count: 5, mean: 3.200000, stdev: 1.600000, max: 5.000000, min: 1.000000)
scala> rdd1.min
res17: Int = 1
scala> rdd1.min
res17: Int = 1
scala> rdd1.max
res18: Int = 5
scala> rdd1.stdev
res19: Double = 1.6
scala> rdd1.count
res20: Long = 5
scala> rdd1.sum
res21: Double = 16.0
scala> rdd1.mean
res22: Double = 3.2
4. RDD Key-Value 基本 “轉換” 運算 -這個是map-reduce的基礎
//建立Key-Value的RDD
scala> val kv = sc.parallelize(List((3,4),(3,6),(5,6),(1,2)))
kv: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> kv.collect
res23: Array[(Int, Int)] = Array((3,4), (3,6), (5,6), (1,2))
//列出所有的key值
scala> kv.keys.collect
res24: Array[Int] = Array(3, 3, 5, 1)
//列出所有的value值
scala> kv.values.collect
res25: Array[Int] = Array(4, 6, 6, 2)
//使用filter篩選出key<5的所有k-v對
scala> kv.filter{case (key,value) => key<5}.collect
res28: Array[(Int, Int)] = Array((3,4), (3,6), (1,2))
//mapValues,可以針對RDD內每一組(K,V)進行運算,並且產生另外一個RDD
scala> kv.mapValues(x => x*x).collect
res29: Array[(Int, Int)] = Array((3,16), (3,36), (5,36), (1,4))
//sortByKey 從小到大按照key排序
scala> kv.sortByKey(true).collect //true可以不寫
res30: Array[(Int, Int)] = Array((1,2), (3,4), (3,6), (5,6))
scala> kv.sortByKey(false).collect
res32: Array[(Int, Int)] = Array((5,6), (3,4), (3,6), (1,2))
//reduceByKey:不同的key,保持不變,相同的key,數值相加
1). 例如:Array((1,2), (3,4), (3,6), (5,6)) ,第一個是key,第二個是value
2). reduceByKey 會虛招相同的key合併,相同的key資料有(3,4), (3,6)
3). 合併之後的結果為:(3,4+6)
4). 剩下的(1,2),(5,6),因為沒有相同的key,保持不變
rdd1.reduceByKey((x,y) => (x+y)).collect
5. 多個RDD Key-Value “轉換” 運算
val rdd1 = sc.parallelize(List((3,4),(3,6),(5,6),(1,2)))
val rdd2 = sc.parallelize(List((3,8),(6,8)))
//Key-Value RDD join運算:按照相同的key值join起來
scala> rdd1.join(rdd2)
res4: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[8] at join at <console>:28
scala> rdd1.join(rdd2).collect
res5: Array[(Int, (Int, Int))] = Array((3,(4,8)), (3,(6,8)))
scala> rdd1.join(rdd2).collect.foreach(println)
(3,(4,8))
(3,(6,8))
//Key-Value RDD leftOuterJoin 運算,運算規則如下:
1). leftOuterJoin 會從左邊的集合(rdd1)對應到右邊的集合(rdd2),並顯示所有左邊集合(rdd1)中的元素
2). 如果rdd1的key值對應到rdd2,會顯示相同的key (3,(4,Some(8)))、(3,(6,Some(8)))
3). 如果rdd1的key值對應不到rdd2,會顯示None (5,(6,None))、(1,(2,None))
scala> rdd1.leftOuterJoin(rdd2).collect
res7: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(8))), (3,(6,Some(8))), (5,(6,None)))
scala> rdd1.leftOuterJoin(rdd2).collect.foreach(println)
(1,(2,None))
(3,(4,Some(8)))
(3,(6,Some(8)))
(5,(6,None))
//Key-Value RDD rightOuterJoin 運算,運算規則如下:
1). rightOuterJoin 會從右邊的集合(rdd1)對應到左邊的集合(rdd2),並顯示所有右邊集合(rdd1)中的元素
2). 如果rdd1的key值對應到rdd2,會顯示相同的key (3,(4,Some(8)))、(3,(6,Some(8)))
scala> rdd1.rightOuterJoin(rdd2).collect
res9: Array[(Int, (Option[Int], Int))] = Array((6,(None,8)), (3,(Some(4),8)), (3,(Some(6),8)))
scala> rdd