1. 程式人生 > >Spark RDD的Transformation操作

Spark RDD的Transformation操作

1、建立RDD的兩種方式: 
(1)、從HDFS讀入資料產生RDD; 
(2)、有其他已存在的RDD轉換得到新的RDD;

scala> val textFile = sc.textFile("hdfs://192.169.26.58:9000/home/datamining/zhaozhuohui/workspace/test01.txt")
scala> val tf2 = textFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
scala> tf2.clollect

2、RDD主要有三種類型的操作:Transformation、Action、Persist。

3、Transformation操作的懶載入機制:避免產生中間結果資料,在Action操作時才進行真正的操作。這樣一連串的操作一起執行就有優化的空間。

4、Transformation的map操作:map是對RDD中的每個元素都執行一個指定的函式來產生一個新的RDD。 
這裡寫圖片描述

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> val rdd2 = rdd1.map(_ * 2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at map at <console>:29
scala> rdd2.collect
res12: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18) 

5、flatMap:與map類似,flatMap會將經過函式處理的元素生成到一個RDD中。 
這裡寫圖片描述

scala> val rdd1 = sc.parallelize(1 to 4, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> val rdd2 = rdd1.flatMap(x => 1 to x)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at flatMap at <console>:29
scala> rdd2.collect
res14: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)   

6、mapPartitions:map的一個變種。輸入函式作用於每個分割槽,每個分割槽作為整體來處理。輸入函式的引數是迭代器,返回值也是一個迭代器。處理後的合併結果會自動轉化成一個新的RDD。

scala> val rdd1 = sc.parallelize(1 to 9, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:27
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
     |   var res = List[(T, T)]()
     |   var pre = iter.next
     |   while (iter.hasNext)
     |   {
     |     val cur = iter.next;
     |     res .::= (pre, cur)
     |     pre = cur;
     |   }
     |   res.iterator
     | }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]

scala> rdd1.mapPartitions(myfunc).collect
res17: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 

7、glom:將每個分割槽中的元素轉換成Array,這樣每個分割槽就只有一個數組元素,最終返回一個RDD 
這裡寫圖片描述

scala> var rdd1 = sc.makeRDD(1 to 10, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:27
scala> rdd1.partitions.size
res18: Int = 3
scala> rdd1.glom().collect
res19: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

8、Transformation的filter操作:篩選出輸入函式計算結果為true的元素,放入到一個新的RDD中。

def funOps2(): Unit = {
  val a = sc.parallelize(1 to 10, 3)
  val b = a.filter(_ % 2 == 0)
  b.collect
}

9、Transformation的distinct操作

scala> val a = sc.parallelize(List("tom", "jim", "sherry", "dog", "tom"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> a.distinct.collect
res1: Array[String] = Array(jim, tom, dog, sherry)                              
scala> val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 1, 3, 2))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
scala> b.distinct(2).partitions.length
res2: Int = 2

10、Transformation的cartesian計算兩個RDD的笛卡爾積

scala> val x = sc.parallelize(List(1, 2, 3, 4, 5))
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> val y = sc.parallelize(List(6, 7, 8, 9, 10))
y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:27
scala> x.cartesian(y).collect
res3: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))

11、Transformation的union,++操作,兩個RDD取並集

scala> val a = sc.parallelize(3 to 6, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27
scala> val b = sc.parallelize(5 to 7, 1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:27
scala> (a ++ b).collect
res4: Array[Int] = Array(3, 4, 5, 6, 5, 6, 7)

12、Transformation的mapValues操作,處理兩個元素的tuple構成的RDD。把mapValue引數傳入的輸入函式應用到每個value上,生成一個由兩個元素的tuple構成的新的RDD。

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panda", "eagle"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at <console>:27
scala> val b = a.map(x => (x.length, x))
b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[15] at map at <console>:31
scala> b.mapValues("x" + _ + "x").collect
res6: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (5,xpandax), (5,xeaglex))

13、Transformation的subtract操作,取兩個RDD的差集,返回一個新的RDD。

scala> val a = sc.parallelize(1 to 9, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:27
scala> val b = sc.parallelize(1 to 5, 1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at parallelize at <console>:27
scala> val c = a.subtract(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[22] at subtract at <console>:31
scala> c.collect
res7: Array[Int] = Array(6, 7, 8, 9)

14、Transformation的sample操作,隨機從RDD中取出一個片段作為一個新的RDD。

scala> val a = sc.parallelize(1 to 10, 1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27
scala> a.sample(true, 0.5, 0).count
res9: Long = 4
scala> a.sample(true, 0.2, 12).count
res10: Long = 2

15、takeSample隨機取指定數目的元素,返回的是陣列不是RDD

scala> val x = sc.parallelize(1 to 10, 1)
x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:27
scala> x.takeSample(true, 5, 1)
res12: Array[Int] = Array(3, 4, 7, 10, 3)

16、Transformation的groupByKey操作,用於將RDD[K,V]中每個K對應的V值,合併到一個集合Iterable[V]中。

scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:27
scala> rdd1.groupByKey().collect
res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1)), (A,CompactBuffer(0, 2)), (c,CompactBuffer(1)))

17、Transformation的partitionBy操作,根據傳入的分割槽器進行分割槽。

18、Transformation的cogroup操作,相當於SQL中的全外關聯full outer join,返回左右RDD中的記錄,關聯不上的為空。

scala> var rdd1 = sc.makeRDD(Array(("A", "1"), ("B", "2"), ("C", "3")), 2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at <console>:27
scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")), 2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at makeRDD at <console>:27
scala> var rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[37] at cogroup at <console>:31
scala> rdd3.collect
res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((B,(CompactBuffer(2),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d))), (A,(CompactBuffer(1),CompactBuffer(a))), (C,(CompactBuffer(3),CompactBuffer(c))))

19、Transformation的combineByKey操作,使用使用者設定好的聚合函式對每個key中得value進行組合(combine),可以將輸入型別為RDD[(k, v)]轉成RDD[(k, c)]。

20、Transformation的reduceByKey操作,該函式用於將RDD[K,V]中每個K對應的V值根據對映函式來兩兩運算。

scala> var rdd1 = sc.makeRDD(Array(("A", 0), ("A", 2), ("B", 1), ("B", 2),("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at makeRDD at <console>:27
scala> rdd1.partitions.length
res14: Int = 2
scala> var rdd2 = rdd1.reduceByKey(_ + _)
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at <console>:29
scala> rdd2.collect
res15: Array[(String, Int)] = Array((B,3), (A,2), (c,1))   

21、Transformation的join操作,def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))],要進行join操作的兩個RDD的每個元素必須是兩個子元素的tuple.

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[38] at makeRDD at <console>:27
scala> var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[39] at makeRDD at <console>:27
scala> rdd1.join(rdd2).collect
res17: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))

22、Transformation的leftOuterJoin、rightOuterJoin操作。