Spark計算模型之熟練使用RDD的運算元完成計算 ----tranfoemation
阿新 • • 發佈:2019-01-11
Spark計算模型之熟練使用RDD的運算元完成計算 ----tranfoemation
1.彈性分散式資料集RDD
1.1什麼是RDD RDD(Rsedilient Distributed Dataaset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯示德將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大的提升了查詢速度。
1.2RDD的屬性
1)、一組分片(Partition),即資料集的基本組成單位,對於RDD來說,每個分片都會一個計算任務處理,並決定平行計算的顆粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配的CPU Core的數目。
2)、一個計算每個分割槽的函式。Spark中的RDD的計算是以分片為單位,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。
3)、RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似流水線一樣的前後依賴關係,再部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。
4)、一個Partitioner,即RDD的分片函式,當前Spark中實現了兩種的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangPartitioner,只有對於Key-value的RDD,才會有Partitioner,非Key-value的RDD的Partitioner的值是None.Partitioner函式不但決定了RDD本身的分片數量,也決定了Partent RDD shuffle輸出時的分片數量。
5)、一個列表,儲存存取每個Partitioner的優先位置(Preferred location)。對於一個HDFS檔案來說,這個列表儲存的就是每個Partition所在的位置,按照“移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要資料塊的儲存位置。
3.1建立RDD
1、啟動shell, 定義master 、executor-memory 執行記憶體
total-executior-cores 全部行的核數
./spark-shell --master spark://root1:7077 --executor-memory 512m --total-executor-cores 1
2)、由一個已經存在的Scala集合建立
val rdd1=sc.parallelize(Array(1,2,3,4,5,6))
3)、由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS,Cassandra、HBase等
scala> val rdd2=sc.textFile("hdfs://root1:9000/words.txt")
----------------------------------------------------------------------------------------------------------------------------------------------------------------
scala> val rdd2=sc.textFile("hdfs://root1:9000/words.txt").flatMap(_.split(" ")).map((_,1)).collect
3.2 RDD程式設計API
3.2.1 Tranformation
RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。
常用的Transformation:
3.2.2 Action
4.練習
4.1 parallelize 類似於Map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iteratior[T] =>Iteratior[U]
scala> val rdd1=sc.parallelize(Array(1,2,3,4,5,6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:27
4.2 map(func)
每個元素都乘以10
scala> val rdd2=rdd1.map(_*10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:29
scala> val rdd2=rdd1.map(_*10).collect
rdd2: Array[Int] = Array(10, 20, 30, 40, 50, 60)
選出 >30 的元素
scala> val rdd3=rdd2.filter(_>30)
rdd3: Array[Int] = Array(40, 50, 60)
4.3 設定分割槽的長度
scala> val rdd4=sc.parallelize(Array(1,2,3,4,5,6),5)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:27
讀取分割槽的長度
scala> rdd4.partitions.length
res1: Int = 5
4.4 陣列的每個元素都+2,並正排序
scala> val rdd5=sc.parallelize(Array(1,0,5,9,4,3),2)
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:27
scala> val rdd6=rdd5.map(_+2).sortBy(x=>x,true)
rdd6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at sortBy at <console>:29
scala> val rdd6=rdd5.map(_+2).sortBy(x=>x,true).collect
rdd6: Array[Int] = Array(2, 3, 5, 6, 7, 11)
4.5 篩選陣列rdd5中大於10的元素
scala> val rdd7=rdd6.filter(_>10)
rdd7: Array[Int] = Array(11)
4.6 進行聯絡,建立一個數組,每個元素都*2,按照正排序進行排序
scala> val rdd8=sc.parallelize(List(15,12,1,3,6)).map(_*2).sortBy(x=>x,true).collect
rdd8: Array[Int] = Array(2, 6, 12, 24, 30)
4.7 建立String的陣列,進行統計相同單詞的個數
scala> val rrd9=sc.parallelize(Array("a b c","c d a"))
rrd9: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[35] at parallelize at <console>:27
scala> rrd9.flatMap(_.split(" ")).map((_,1)).groupBy(_._1).map(t=>(t._1,t._2.size)).collect
res12: Array[(String, Int)] = Array((d,1), (b,1), (a,2), (c,2))
4.8
scala> val rdd10=sc.parallelize(List(1,3,4,5))
rdd10: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[64] at parallelize at <console>:27
scala> val rdd11=sc.parallelize(List(2,3,4,6))
rdd11: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:27
scala> val rdd12=rdd10.union(rdd11).collect
rdd12: Array[Int] = Array(1, 3, 4, 5, 2, 3, 4, 6)
4.9 distinct 去除陣列中重複的數字
scala> rdd12.distinct.sortBy(x=>x).collect
res17: Array[Int] = Array(1, 2, 3, 4, 5, 6)
4.10 交集 intersection
scala> val rdd13=rdd11.intersection(rdd10).collect
rdd13: Array[Int] = Array(4, 3)
4.11 string陣列求交集
scala> val rdd14=sc.parallelize(List(("tom",1),("kack",2),("rose",1)))
rdd14: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[99] at parallelize at <console>:27
scala> val rdd15=sc.parallelize(List(("tom",2),("jeery",2),("lishua",1)))
rdd15: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[100] at parallelize at <console>:27
scala> rdd14.intersection(rdd15)
res18: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[106] at intersection at <console>:32
scala> rdd14.intersection(rdd15).collect
res19: Array[(String, Int)] = Array()
4.12 求兩個string陣列中相同的Key
scala> rdd14.join(rdd15).collect
res20: Array[(String, (Int, Int))] = Array((tom,(1,2)))
4.13 利用leftOutJoin 求兩個string陣列中相同的Key
scala> val rdd16=rdd14.leftOuterJoin(rdd15).collect
rdd16: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (kack,(2,None)), (rose,(1,None)))