[Spark04]RDD中的運算元
1、RDD可以分為兩類,transformations和actions。
2、Transformations 變換/轉換運算元:將一個RDD轉換成另一個RDD,所有的Transformation都是lazy的,只有發生action是才會觸發計算。
3、Action 行動運算元:這類運算元會觸發 SparkContext 提交 作業。
####思考:spark官網說這樣設定運算元會使spark執行地更加的高效,請問這是為什麼呢?
答:1)假設執行一個rdda.map().reduce()的操作,如果作為轉換運算元map()也觸發計算,則肯定得將結果寫出來,降低效率。
2)由於lineage的關係,之後詳細講解。
4、當你對一個RDD進行轉換時,只要觸發action操作就可能會引起RDD的重算,RDD的重算機制使得當某個RDD資料丟失重算可以恢復該RDD。
5、你可以通過persist()或cache()方法將RDD儲存在記憶體中,這樣的話,在下次查詢時可以更快地訪問到RDD中的元素。spark也支援將RDD儲存在磁碟上,也支援RDD的跨節點複製。
---------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------
以下對Transformations和Action 進行詳細的解釋:
一、Transformations中的運算元
a.map運算元
scala> sc.parallelize(1 to 4).map(x => x*x).collect
res0: Array[Int] = Array(1, 4, 9, 16)
b.flatMap運算元
scala> f.flatMap(_.map(_*2))
res20: List[Int] = List(2, 4, 6, 8, 4, 6, 8, 10)
scala> f.map(_.map(_*2))
res23: List[List[Int]] = List(List(2, 4, 6, 8), List(4, 6, 8, 10))
scala> sc.parallelize(List("aa,bb,cc","cxf,spring,struts2","java,C++,javaScript")).flatMap(x => x.split(",")).foreach(println)
aa
bb
cc
cxf
spring
struts2
java
C++
javaScript
c.filter()運算元
scala> sc.parallelize(1 to 9).filter(_%2==0).collect
res14: Array[Int] = Array(2, 4, 6, 8)
d.mapPartitions()運算元
與Map類似,但map中的func作用的是RDD中的每個元素,而mapPartitions中的func作用的物件是RDD的一整個分割槽。
scala> sc.parallelize(1 to 9, 3).mapPartitions( a=>a.filter(_>=7)).collect
res15: Array[Int] = Array(7, 8, 9)
e.sample(withReplacement, fraction, seed)運算元
對RDD進行抽樣,其中引數withReplacement為true時表示抽樣之後還放回,可以被多次抽樣,false表示不放回;fraction表示抽樣比例;seed為隨機數種子,比如當前時間戳,這個值如果沒有傳入,預設值是一個0~Long.maxvalue之間的整數。
f、mapValues()運算元
##只對鍵值對中的Values進行操作
scala> sc.parallelize(Array("a","b","c","d")).map(x=>(x,1)).mapValues(_+1).collect
res0: Array[(String, Int)] = Array((a,2), (b,2), (c,2), (d,2))
h.subtract()運算元
##返回兩個RDD的差集
scala> val a=sc.parallelize(1 to 9)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> val b=sc.parallelize(3 to 6)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> a.subtract(b).collect
res1: Array[Int] = Array(2, 8, 1, 7, 9)
i.intersection()運算元
##返回兩個RDD的交集
scala> a.intersection(b).collect
res2: Array[Int] = Array(4, 6, 3, 5)
二、Actions中的運算元
a.collect()運算元:
collect 相當於 toArray, toArray 已經過時不推薦使用, collect 將分散式的 RDD 返回為一個單機的 scala Array 陣列。在這個陣列上運用 scala 的函式式操作。
b.count()運算元:
count 返回整個 RDD 的元素個數
c.reduce(func)運算元:
func函式將兩個引數歸併為一個引數,並將最後結果返回給驅動程式
d.first()運算元:
返回RDD的第一個元素
e.take(n)運算元:
返回RDD中前N個元素
f.sum()運算元:
返回RDD中所有元素的最大值
g.top(n)運算元:
返回一個包含RDD中最大的前n的數新的RDD。對於數值,按大小排,對於字串,按字典順序排。
h.max()運算元:
返回一個RDD中的最大值
i.min()運算元:
返回一個RDD中的最小值
j.reduceByKey()運算元:
將key相同的value加起來
k.takeSample(withReplacement, num, [seed])運算元:
takeSample()函式和上面的sample函式是一個原理,但是不使用相對比例取樣,而是按設定的取樣個數進行取樣,同時返回結果不再是RDD,而是相當於對取樣後的資料進行
Collect(),返回結果的集合為單機的陣列。
l.takeOrdered()運算元:
返回RDD前N元素,採用自然順序或自定義比較器
---------------------------------------------------------------------------------------------------------------------------------------
Transformations和actions運算元太他麼多了,不一一列舉了啊,參看以下別人的部落格。
Spark中的運算元詳解請轉到以下連結:
http://blog.csdn.net/dream0352/article/details/62229977
http://blog.csdn.net/zcf1002797280/article/details/50752537
http://lxw1234.com/archives/2015/07/363.htm