1. 程式人生 > >Spark-RDD運算元

Spark-RDD運算元

一、Spark-RDD運算元簡介

    RDD(Resilient Distributed DataSet)是分散式資料集。RDD是Spark最基本的資料的抽象。
    scala中的集合。RDD相當於一個不可變、可分割槽、裡面的元素可以平行計算的集合。
    
    RDD特點:
    具有資料流模型的特點
    自動容錯
    位置感知排程
    可伸縮性
    RDD允許使用者在執行多個查詢時將工作集快取在記憶體中,可以重用工作集,大大的提升了查詢速度。
    
    RDD型別分為:
    1)Transformation
    轉換
    2)Action
    動作

二、RDD建立

    RDD分為兩種型別:
    1)Transformation(lazy-》懶載入)
    2)Action(觸發任務)

例子:

    scala> sc.textFile("/root/words.txt")
    res8: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[28] at textFile at <console>:25

    scala> sc.textFile("/root/words.txt").flatMap(_.split(" "))
    res9: org.apache.spark.rdd.RDD[String] 
= MapPartitionsRDD[31] at flatMap at <console>:25 scala> res8 res10: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[28] at textFile at <console>:25 scala> res8.flatMap(_.split(" ")) res11: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at flatMap at <console>:27 scala
> res8.flatMap(_.split(" ")).map((_,1)) res12: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at map at <console>:27 scala> res8.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) res13: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[37] at reduceByKey at <console>:27 scala> res13.collect res14: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1)) scala> val rdd1 = sc.textFile("/root/words.txt") rdd1: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[39] at textFile at <console>:24 scala> rdd1.count res15: Long = 3 scala> val list = List(1,3,5,7) list: List[Int] = List(1, 3, 5, 7) scala> list.map(_ * 100) res16: List[Int] = List(100, 300, 500, 700) scala> val rdd1 = sc.parallelize(list) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at parallelize at <console>:26 scala> rdd1.map(_ * 100) res17: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at map at <console>:29 scala> res17.collect res18: Array[Int] = Array(100, 300, 500, 700) scala> val rdd2 = sc.makeRDD(list) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at makeRDD at <console>:26 scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[43] at parallelize at <console>:24 scala> val rdd2 = rdd1.map(_ * 1000) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[44] at map at <console>:26 scala> rdd2.collect res19: Array[Int] = Array(1000, 2000, 3000, 4000, 5000)

三、常用Transformation

    Transformation特點:
    1)生成新的RDD
    2)lazy懶載入 等待處理
    3)並不會儲存真正的資料,記錄了轉換關係

1、map(func)
2、flatMap(func)
3、sortby
4、reduceByKey

    scala> sc.textFile("/root/words.txt")
    res8: org.apache.spark.rdd.RDD[String] = /root/words.txt MapPartitionsRDD[28] at textFile at <console>:25

    scala> res8.flatMap(_.split(" "))
    res11: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[32] at flatMap at <console>:27

    scala> res8.flatMap(_.split(" ")).map((_,1))
    res12: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at map at <console>:27

    scala> res8.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    res13: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[37] at reduceByKey at <console>:27

5、filter  過濾

    scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24

    scala> rdd1.filter(_ % 2 == 0)
    res24: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[49] at filter at <console>:27

    scala> res24.collect
    res25: Array[Int] = Array(2, 4)

6、union  並集

    scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6,7))
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[50] at parallelize at <console>:24

    scala> rdd1.union(rdd2)
    res26: org.apache.spark.rdd.RDD[Int] = UnionRDD[51] at union at <console>:29

    scala> res26.collect
    res27: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 7)

    scala> rdd1 union rdd2
    res28: org.apache.spark.rdd.RDD[Int] = UnionRDD[52] at union at <console>:29

    scala> res28.collect
    res29: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 7)

7、groupByKey  分組

    scala> val rdd3 = sc.parallelize(List(("Tom",18),("John",16),("Tom",20),("Mary",17),("John",23)))
    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[53] at parallelize at <console>:24

    scala> rdd3.groupByKey
    res30: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[54] at groupByKey at <console>:27

    scala> res30.collect
    res31: Array[(String, Iterable[Int])] = Array((John,CompactBuffer(16, 23)), (Tom,CompactBuffer(18, 20)), (Mary,CompactBuffer(17)))

8、intersection  交集

    scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at <console>:24

    scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6,7))
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24

    scala> rdd1.intersection(rdd2)
    res33: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[62] at intersection at <console>:29

    scala> res33.collect
    res34: Array[Int] = Array(3, 4, 1, 5, 2)

9、join  關聯

    scala> val rdd1 = sc.parallelize(List(("Tom",18),("John",16),("Mary",21)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[65] at parallelize at <console>:24

    scala> val rdd2 = sc.parallelize(List(("Tom",28),("John",26),("Cat",17)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[66] at parallelize at <console>:24

    scala> rdd1.join(rdd2)
    res39: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[69] at join at <console>:29

    scala> res39.collect
    res40: Array[(String, (Int, Int))] = Array((John,(16,26)), (Tom,(18,28)))

10、leftOuterJoin  左連線

保留左側RDD,右側如果join上保留,沒join上None

    scala> val rdd1 = sc.parallelize(List(("Tom",18),("John",16),("Mary",21)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[65] at parallelize at <console>:24

    scala> val rdd2 = sc.parallelize(List(("Tom",28),("John",26),("Cat",17)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[66] at parallelize at <console>:24

    scala> rdd1.leftOuterJoin(rdd2)
    res41: org.apache.spark.rdd.RDD[(String, (Int, Option[Int]))] = MapPartitionsRDD[72] at leftOuterJoin at <console>:29

    scala> res41.collect
    res42: Array[(String, (Int, Option[Int]))] = Array((John,(16,Some(26))), (Tom,(18,Some(28))), (Mary,(21,None)))

11、rightOuterJoin  右連線

    scala> val rdd1 = sc.parallelize(List(("Tom",18),("John",16),("Mary",21)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[65] at parallelize at <console>:24

    scala> val rdd2 = sc.parallelize(List(("Tom",28),("John",26),("Cat",17)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[66] at parallelize at <console>:24

    scala> rdd1.rightOuterJoin(rdd2).collect
    res43: Array[(String, (Option[Int], Int))] = Array((John,(Some(16),26)), (Tom,(Some(18),28)), (Cat,(None,17)))

12、cartesian  笛卡爾積

    scala> val rdd1 = sc.parallelize(List("Tom","Mary"))
    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[76] at parallelize at <console>:24

    scala> val rdd2 = sc.parallelize(List("John","Joe"))
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[77] at parallelize at <console>:24

    scala> rdd1.cartesian(rdd2)
    res45: org.apache.spark.rdd.RDD[(String, String)] = CartesianRDD[78] at cartesian at <console>:29

    scala> res45.collect
    res46: Array[(String, String)] = Array((Tom,John), (Tom,Joe), (Mary,John), (Mary,Joe))

四、常用的Action

1、collect  收集

    scala> val rdd1 = sc.parallelize(List(1,2,3,4))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

    scala> rdd1.collect
    res0: Array[Int] = Array(1, 2, 3, 4)   

2、saveAsTextFile(path)  儲存檔案
三份資料:5B 5B 600B
理想切分:5+5+600=610 610/3 = 203
5B一片
5B一片
203一片
203一片
203一片
1一片

    scala> val rdd1 = sc.parallelize(List(1,2,3,4))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24                                         

    scala> rdd1.saveAsTextFile("/root/RDD1")
    
    //檢視分割槽數
    scala> rdd1.partitions.length
    res3: Int = 4

3、count  計數

    scala> val rdd1 = sc.parallelize(List(1,2,3,4))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
                                                                                    
    scala> rdd1.count
    res2: Long = 4

4、reduce  聚合

    scala> val rdd2 = sc.parallelize(List(1,2,3,4),2)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

    scala> rdd2.partitions.length
    res4: Int = 2

    scala> rdd2.reduce(_+_)
    res5: Int = 10

5、countByKey()  根據key計數

    scala> sc.parallelize(List(("Tom",18),("Tom",28),("John",14),("Mary",16)))
    res9: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at <console>:25

    scala> res9.count
    res10: Long = 4

    scala> res9.countByKey()
    res11: scala.collection.Map[String,Long] = Map(Tom -> 2, Mary -> 1, John -> 1)  

    scala> res9.reduceByKey(_+_)
    res12: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:27

    scala> res9.reduceByKey(_+_).collect
    res13: Array[(String, Int)] = Array((Tom,46), (Mary,16), (John,14))

6、take(n)  取出多少個元素

    scala> val rdd1 = sc.parallelize(List(1,2,3,4))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24                                         

    scala> rdd1.take(2)
    res15: Array[Int] = Array(1, 2)
    
    scala> val rdd3 = sc.parallelize(List(3,2,8,1,7))
    rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

    scala> rdd3.take(2)
    res17: Array[Int] = Array(3, 2)

7、first  返回RDD的第一個元素

    scala> val rdd3 = sc.parallelize(List(3,2,8,1,7))
    rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

    scala> rdd3.first
    res18: Int = 3

8、takeOrdered(n)  取出多少個元素 預設正序

    scala> val rdd3 = sc.parallelize(List(3,2,8,1,7))
    rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

    scala> rdd3.takeOrdered(2)
    res19: Array[Int] = Array(1, 2)

9、top(n)  倒序排序 取出元素

    scala> val rdd3 = sc.parallelize(List(3,2,8,1,7))
    rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at <console>:24

    scala> rdd3.top(2)
    res20: Array[Int] = Array(8, 7)

五、spark高階運算元 

1、mapPartitionsWithIndex(func)
設定分割槽,並且檢視每個分割槽中存放的元素
檢視每個分割槽中元素
需要傳遞函式作為引數
val func = (index:Int,iter:Iterator[(Int)]) => {iter.toList.map(x => "partID:" + index + "," + "datas:" + x + "]").iterator}

    scala> val rdd3 = sc.parallelize(List(1,2,3,4,5,6,7),2)
    rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

    scala> val func = (index:Int,iter:Iterator[(Int)]) => {iter.toList.map(x => "partID:" + index + "," + "datas:" + x + "]").iterator}
    func: (Int, Iterator[Int]) => Iterator[String] = <function2>

    scala> rdd3.mapPartitionsWithIndex(func)
    res21: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at mapPartitionsWithIndex at <console>:29

    scala> rdd3.mapPartitionsWithIndex(func).collect
    res22: Array[String] = Array(partID:0,datas:1], partID:0,datas:2], partID:0,datas:3], partID:1,datas:4], partID:1,datas:5], partID:1,datas:6], partID:1,datas:7])

2、aggregate
聚合,先區域性後全域性
max 取最大值
min 取最小值

    scala> val rdd3 = sc.parallelize(List(1,2,3,4,5,6,7),2)
    rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

    scala> rdd3.aggregate(0)(_+_,_+_)
    res23: Int = 28

    scala> rdd3.max
    res24: Int = 7

    scala> rdd3.min
    res25: Int = 1
    
    
    scala> val rdd3 = sc.parallelize(List(1,2,3,4,5,6,7),2)
    rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24

    scala> rdd3.aggregate(0)(math.max(_,_),_+_)
    res29: Int = 10

    scala> rdd3.aggregate(10)(math.max(_,_),_+_)
    res31: Int = 30

    // 1+2+3+20 + 4+5+6+7+20 + 20 = 88
    scala> rdd3.aggregate(20)(_+_,_+_)
    res32: Int = 88

    
    scala> val rdd4 = sc.parallelize(List("a","b","c","d","e"),2)
    rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24

    scala> rdd4.aggregate("|")(_+_,_+_)
    res33: String = ||ab|cde

    scala> rdd4.aggregate("|")(_+_,_+_)
    res34: String = ||cde|ab
    

    scala> val rdd5 = sc.parallelize(List("12","23","234","3456"),2)
    rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:24

    scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y)
    res35: String = 24

    scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y)
    res36: String = 42

    scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y)
    res37: String = 24

    scala> rdd5.aggregate("")((x,y) => math.max(x.length,y.length).toString,(x,y) => x+y)
    res38: String = 42
    
    
    scala> val rdd6 = sc.parallelize(List("12","23","345",""),2)
    rdd6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:24

    scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y)
    res41: String = 01

    scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y)
    res42: String = 10

    scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y)
    res43: String = 01

    scala> rdd6.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y)
    res48: String = 10
    

    scala> val rdd7 = sc.parallelize(List("12","23","","456"),2)
    rdd7: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24                                                             

    scala> rdd7.aggregate("")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y)
    res1: String = 11

    scala> ("").length
    res2: Int = 0

    scala> 0.length
    <console>:24: error: value length is not a member of Int
           0.length
             ^

    scala> 0.toString.length
    res5: Int = 1

    scala> rdd7.aggregate("0")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y)
    res6: String = 011

    scala> rdd7.aggregate("0")((x,y) => math.min(x.length,y.length).toString,(x,y) => x+y)
    res7: String = 011

3、aggregateByKey
根據key聚合,先區域性再全域性

    scala> val rdd8 = sc.parallelize(List(("cat",3),("cat",8),("mouse",6),("dog",8)))
    rdd8: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24

    scala> def func(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {iter.toList.map(x => "partID:" + index + "," + "values:" + x + "]").iterator}
    func: (index: Int, iter: Iterator[(String, Int)])Iterator[String]

    scala> rdd8.mapPartitionsWithIndex(func).collect
    res34: Array[String] = Array(partID:0,values:(cat,3)], partID:1,values:(cat,8)], partID:2,values:(mouse,6)], partID:3,values:(dog,8)])

    scala> rdd8.aggregateByKey(0)(_+_,_+_).collect
    res35: Array[(String, Int)] = Array((dog,8), (mouse,6), (cat,11))

4、combineByKey
aggregateByKey和reduceByKey底層呼叫都是combineByKey
最底層的方法,先區域性累加,再全域性累加

    scala> val rdd1 = sc.textFile("hdfs://192.168.146.111:9000/words.txt").flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).collect
    rdd1: Array[(String, Int)] = Array((haha,1), (heihei,1), (hello,3), (Beijing,1), (world,1), (China,1))

    scala> val rdd2 = sc.textFile("hdfs://192.168.146.111:9000/words.txt").flatMap(_.split("\t")).map((_,1)).combineByKey(x => x,(m:Int,n:Int) => (m+n),(a:Int,b:Int) => (a+b)).collect
    rdd2: Array[(String, Int)] = Array((haha,1), (heihei,1), (hello,3), (Beijing,1), (world,1), (China,1))

5、coalesce
coalesce(4,true)
分割槽數4
是否shuffle

repartition的實現,已預設加了shuffle

    scala> val rdd2 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24

    scala> rdd2.partitions.length
    res42: Int = 2

    scala> val rdd3 = rdd2.coalesce(4,true)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at coalesce at <console>:26

    scala> rdd3.partitions.length
    res43: Int = 4

    scala> val rdd4 = rdd3.repartition(5)
    rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[23] at repartition at <console>:28

    scala> rdd4.partitions.length
    res44: Int = 5

6、filterByRange
過濾出指定範圍的元素

    scala> val rdd6 = sc.parallelize(List(("a",3),("b",2),("d",5),("e",8)))
    rdd6: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:24

    scala> rdd6.filterByRange("b","d").collect
    res45: Array[(String, Int)] = Array((b,2), (d,5))

    scala> rdd6.filterByRange("b","e").collect
    res46: Array[(String, Int)] = Array((b,2), (d,5), (e,8))

7、flatMapValues
切分出每個元素

    scala> val rdd7 = sc.parallelize(List(("a","3 6"),("b","2 5"),("d","5 8")))
    rdd7: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[27] at parallelize at <console>:24

    scala> rdd7.flatMapValues(_.split(" ")).collect
    res47: Array[(String, String)] = Array((a,3), (a,6), (b,2), (b,5), (d,5), (d,8))

8、foldByKey
需求:根據key來拼接字串

    scala> val rdd8 = sc.parallelize(List("Tom","John","Mary","Joe"),2)
    rdd8: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[31] at parallelize at <console>:24

    scala> val rdd9 = rdd8.map(x => (x.length,x))
    rdd9: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[32] at map at <console>:26

    scala> rdd9.collect
    res48: Array[(Int, String)] = Array((3,Tom), (4,John), (4,Mary), (3,Joe))

    scala> rdd9.foldByKey("")(_+_).collect
    res49: Array[(Int, String)] = Array((4,JohnMary), (3,JoeTom))

9、foreach
遍歷元素

import org.apache.spark.{SparkConf, SparkContext}

object ForeachDemo {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ForeachDemo").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //建立rdd
    val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
    rdd1.foreach(println(_))

    sc.stop()
  }
}

結果:

10、keyBy
以什麼為key
keys values
拿到key 拿到value

    scala> val rdd2 = sc.parallelize(List("Tom","John","Jack"),3)
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24

    scala> val rdd3 = rdd2.keyBy(_.length)
    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[38] at keyBy at <console>:26

    scala> rdd3.collect
    res60: Array[(Int, String)] = Array((3,Tom), (4,John), (4,Jack))

    scala> rdd3.keys.collect
    res61: Array[Int] = Array(3, 4, 4)

    scala> rdd3.values.collect
    res62: Array[String] = Array(Tom, John, Jack)

六、RDD並行化流程