1. 程式人生 > >spark入門三(RDD基本運算)

spark入門三(RDD基本運算)

1. RDD基本操作

val rdd1 = sc.parallelize(List(1,2,3,4,4)) 
輸出結果:rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
//這裡org.apache.spark.rdd.RDD[Int],這裡RDD[Int] 表示RDD裡面存放是Int型別

//將rdd轉換成Array
scala> rdd1.collect
res0: Array[Int] = Array(1, 2, 3, 4, 4)

//建立stringRDD
scala> val rdd2 = sc.parallelize(List("apple","orange","banana","Grape")) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24 //將stringRDD轉換成Array scala> rdd2.collect res1: Array[String] = Array(apple, orange, banana, Grape) //map運算:map運算可以傳入的函式,將每一個元素經過函式運算產生另外一個RDD
scala> rdd1.map(_ + 1).collect res2: Array[Int] = Array(2, 3, 4, 5, 5) scala> rdd2.map(x => "fruit:"+x).collect res5: Array[String] = Array(fruit:apple, fruit:orange, fruit:banana, fruit:Grape) //filter操作:可以用於對RDD內每個元素進行篩選,並且產生新的RDD scala> rdd1.filter(_ < 3).collect res6: Array[Int] =
Array(1, 2) scala> rdd1.filter(x => x < 3).collect //filter對字串操作 scala> rdd2.filter(x => x.contains("ra")).collect res8: Array[String] = Array(orange, Grape) //distinct運算:刪除重複元素 scala> rdd1.distinct.collect res9: Array[Int] = Array(4, 2, 1, 3) //randomSplit運算:可以將整個集合元素,以隨機數的方式按照比例分為多個RDD scala> val sRDD = rdd1.randomSplit(Array(0.4,0.6)) sRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[12] at randomSplit at <console>:25, MapPartitionsRDD[13] at randomSplit at <console>:25) scala> sRDD(0).collect res3: Array[Int] = Array(1, 2, 4, 4) scala> sRDD(1).collect res4: Array[Int] = Array(3) //groupBy運算:按照輸入的匿名引數規則,將資料分為多個Array scala> val gRDD = rdd1.groupBy(x => if(x%2==0) "even" else "odd").collect gRDD: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 4)), (odd,CompactBuffer(3, 1))) //輸出結果 scala> gRDD(0) res5: (String, Iterable[Int]) = (even,CompactBuffer(2, 4, 4)) scala> gRDD(1) res6: (String, Iterable[Int]) = (odd,CompactBuffer(3, 1))

2. 多個RDD轉換操作,RDD支援執行多個RDD的運算

scala> val rdd1 = sc.parallelize(List(3,1,2,5,5)) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(5,6)) 
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val rdd3 = sc.parallelize(List(2,7)) 
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

//使用union進行並集運算
scala> rdd1.union(rdd2).union(rdd3).collect
res7: Array[Int] = Array(3, 1, 2, 5, 5, 5, 6, 2, 7)
//使用++進行並集運算
scala> (rdd1 ++ rdd2 ++ rdd3).collect
res8: Array[Int] = Array(3, 1, 2, 5, 5, 5, 6, 2, 7)

//使用intersection進行交集運算
scala> rdd1.intersection(rdd2).collect
res9: Array[Int] = Array(5)

//使用subtract進行差集運算
scala> rdd1.subtract(rdd2).collect
res10: Array[Int] = Array(2, 1, 3)

//使用cartesian進行笛卡爾集的運算
scala> rdd1.cartesian(rdd2).collect
res11: Array[(Int, Int)] = Array((3,5), (1,5), (3,6), (1,6), (2,5), (5,5), (5,5), (2,6), (5,6), (5,6))

3. 基本動作運算(這都是Actions運算,會馬上執行結果)

//讀取第1條資料
scala> rdd1.first
res12: Int = 3
//讀取前幾條資料
scala> rdd1.take(2)
res13: Array[Int] = Array(3, 1)
//按照從大到小排序讀取前幾條資料
scala> rdd1.takeOrdered(4)(Ordering[Int].reverse)
res15: Array[Int] = Array(5, 5, 3, 2)
//統計功能
scala> rdd1.stats
res16: org.apache.spark.util.StatCounter = (count: 5, mean: 3.200000, stdev: 1.600000, max: 5.000000, min: 1.000000)

scala> rdd1.min
res17: Int = 1

scala> rdd1.min
res17: Int = 1

scala> rdd1.max
res18: Int = 5

scala> rdd1.stdev
res19: Double = 1.6

scala> rdd1.count
res20: Long = 5

scala> rdd1.sum
res21: Double = 16.0

scala> rdd1.mean
res22: Double = 3.2

4. RDD Key-Value 基本 “轉換” 運算 -這個是map-reduce的基礎

//建立Key-Value的RDD
scala> val kv = sc.parallelize(List((3,4),(3,6),(5,6),(1,2)))
kv: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24

scala> kv.collect
res23: Array[(Int, Int)] = Array((3,4), (3,6), (5,6), (1,2))

//列出所有的key值
scala> kv.keys.collect
res24: Array[Int] = Array(3, 3, 5, 1)

//列出所有的value值
scala> kv.values.collect
res25: Array[Int] = Array(4, 6, 6, 2)

//使用filter篩選出key<5的所有k-v對
scala> kv.filter{case (key,value) => key<5}.collect
res28: Array[(Int, Int)] = Array((3,4), (3,6), (1,2))

//mapValues,可以針對RDD內每一組(K,V)進行運算,並且產生另外一個RDD
scala> kv.mapValues(x => x*x).collect
res29: Array[(Int, Int)] = Array((3,16), (3,36), (5,36), (1,4))

//sortByKey 從小到大按照key排序
scala> kv.sortByKey(true).collect //true可以不寫
res30: Array[(Int, Int)] = Array((1,2), (3,4), (3,6), (5,6))
scala> kv.sortByKey(false).collect
res32: Array[(Int, Int)] = Array((5,6), (3,4), (3,6), (1,2))

//reduceByKey:不同的key,保持不變,相同的key,數值相加

1). 例如:Array((1,2), (3,4), (3,6), (5,6)) ,第一個是key,第二個是value
2). reduceByKey 會虛招相同的key合併,相同的key資料有(3,4), (3,6)
3). 合併之後的結果為:(3,4+6)
4). 剩下的(1,2)(5,6),因為沒有相同的key,保持不變
rdd1.reduceByKey((x,y) => (x+y)).collect

5. 多個RDD Key-Value “轉換” 運算

val rdd1 = sc.parallelize(List((3,4),(3,6),(5,6),(1,2)))
val rdd2 = sc.parallelize(List((3,8),(6,8)))

//Key-Value RDD join運算:按照相同的key值join起來
scala> rdd1.join(rdd2)
res4: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[8] at join at <console>:28

scala> rdd1.join(rdd2).collect
res5: Array[(Int, (Int, Int))] = Array((3,(4,8)), (3,(6,8)))

scala> rdd1.join(rdd2).collect.foreach(println)
(3,(4,8))
(3,(6,8))

//Key-Value RDD leftOuterJoin 運算,運算規則如下:
1). leftOuterJoin 會從左邊的集合(rdd1)對應到右邊的集合(rdd2),並顯示所有左邊集合(rdd1)中的元素
2). 如果rdd1的key值對應到rdd2,會顯示相同的key (3,(4,Some(8)))(3,(6,Some(8)))
3). 如果rdd1的key值對應不到rdd2,會顯示None (5,(6,None))(1,(2,None))

scala> rdd1.leftOuterJoin(rdd2).collect
res7: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(8))), (3,(6,Some(8))), (5,(6,None)))

scala> rdd1.leftOuterJoin(rdd2).collect.foreach(println)
(1,(2,None))
(3,(4,Some(8)))
(3,(6,Some(8)))
(5,(6,None))

//Key-Value RDD rightOuterJoin 運算,運算規則如下:
1). rightOuterJoin 會從右邊的集合(rdd1)對應到左邊的集合(rdd2),並顯示所有右邊集合(rdd1)中的元素
2). 如果rdd1的key值對應到rdd2,會顯示相同的key (3,(4,Some(8)))(3,(6,Some(8)))
scala> rdd1.rightOuterJoin(rdd2).collect
res9: Array[(Int, (Option[Int], Int))] = Array((6,(None,8)), (3,(Some(4),8)), (3,(Some(6),8)))

scala> rdd