Key-Value型別的RDD的建立及基本轉換(1)
阿新 • • 發佈:2019-01-30
1. 建立一個基本的key-value的RDD
scala> val kvPairRDD = | sc.parallelize(Seq(("key1", "value1"), ("key2", "value2"), ("key3", "value3"))) kvPairRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[17] at parallelize at <console>:25 //使用collect獲得叢集元素資訊 scala> kvPairRDD.collect res21: Array[(String, String)] = Array((key1,value1), (key2,value2), (key3,value3))
2. 可以從一個類的物件中建立RDD
case class User(userId: String, amount: Int)
val personSeqRDD =
sc.parallelize(Seq(User("jeffy", 30), User("kkk", 20), User("jeffy", 30), User("kkk", 30)))
scala> personSeqRDD.collectres22: Array[User] = Array(User(jeffy,30), User(kkk,20), User(jeffy,30), User(kkk,30))
scala> //將RDD變成二元組型別的RDD
scala> val keyByRDD = personSeqRDD.keyBy(x => x.userId)
keyByRDD: org.apache.spark.rdd.RDD[(String, User)] = MapPartitionsRDD[18] at keyBy at <console>:28
scala> keyByRDD.collect
res23: Array[(String, User)] = Array((jeffy,User(jeffy,30)), (kkk,User(kkk,20)), (jeffy,User(jeffy,30)), (kkk,User(kkk,30)))
val keyRDD2 = personSeqRDD.map(user=> (user.userId, user))
scala> keyRDD2.collect
res24: Array[(String, User)] = Array((jeffy,User(jeffy,30)), (kkk,User(kkk,20)), (jeffy,User(jeffy,30)), (kkk,User(kkk,30)))
val keyRDD3 = personSeqRDD.map(user=> (user.userId, user.amount))
scala> keyRDD3.collect res25: Array[(String, Int)] = Array((jeffy,30), (kkk,20), (jeffy,30), (kkk,30))
val groupByRDD = personSeqRDD.groupBy(user=> user.userId)
scala> val groupByRDD = personSeqRDD.groupBy(user => user.userId)
groupByRDD: org.apache.spark.rdd.RDD[(String, Iterable[User])] = ShuffledRDD[22] at groupBy at <console>:28
val rdd1 = sc.parallelize(Seq("test", "hell"))
rdd1.map(str=> (str, 1))scala> val a = rdd1.map(str => (str, 1))
a: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[25] at map at <console>:26
scala> a.collect
res27: Array[(String, Int)] = Array((test,1), (hell,1))