1. 程式人生 > >總結常用的Transformation運算元和Action運算元,及基本用法

總結常用的Transformation運算元和Action運算元,及基本用法

只有是Action時,才會執行立即操作。 Transformation是 lazy的操作,不會立即執行,執行Transformation的運算元時,會返回一個新的RDD,依賴上一個RDD

transformation:
1、sortBy :  對於RDD的是非元組型,rdd1.soreBy(x=>x),元組型按value排序
rdd.soreBy(_._2)(進行shuffle)

2、sortByKey : 對於RDD記憶體的是元組型別的使用(進行shuffle) ,sortBy和
sortByKey都是transformation運算元,但都會觸發job任務,底層new了一個rangePartition物件,底層會呼叫一個foreach方法,進行排序,所以會觸發job

3、reduceByKey(_+_) : 先在分割槽內進行按key合併,在全域性合併(全域性是把相同key的
不同的分割槽,拉倒同一個分割槽(有多個分割槽))會進行shuffle

4、filter: 過濾, 不會進行shuffle

5、flatMap: 方法,可以先切分,再壓平, 不會進行shuffle

6、 rdd1.intersection(rdd2), 求兩個集合的交集

7.0、 rdd1.union(rdd2),返回rdd1和rdd2中的所有元素,返回型別是Array,不Shuffle

7、rdd1.join(rdd2), 返回結果RDD[(String, (Int, Int))],join是內連線,
只有相同的才會join,會有shuffle過程

8、 rdd1.leftOuterJoin(rdd2),返回結果是RDD[(String, (Int, Option[Int]))],如果有rdd2中有和rdd1對應的資料時Some(value值),沒有值None,第一個Int是rdd1中的value,rdd1中的資料會顯示完,有shuffle

9、rdd1.rightOuterJoin(rdd2),返回結果是RDD[(String, (Option[Int], Int))],
String是key型別, Option[Int]表示和rdd2中對應的沒有,就和None,
 有和rdd2中的資料對應的就是Some(value),rdd2中的資料會顯示完,有shuffle
 
10、rdd.groupByKey(), 按key進行聚合,把不同分割槽的相同的key拉倒同一臺機器上,返回值是 RDD[(String, Iterable[Int])],String 是key的型別,迭代器是相同key的value的,如果想要聚合  ,可以呼叫 map(x=>(x._1,x._2.sum))方法,相當於reduceBykey.會進行shuffle,
10.1 rdd.groupBy() 通過指定的值進行聚合,返回的值是
 RDD[(String, Iterable[(String, Int)])] ,迭代器中的型別是整個KV的型別
 
11、rdd.reduceByKey(_+_),返回值RDD[(String, Int)],在分割槽內按key進行聚合
(Combiner),再把不同分割槽的相同的key的資料拉倒同一機器上進行聚合,會進行shuffle,
reduceByKey會進行Combiner,所以比groupByKey效率高,一般用reduceByKey

12、rdd1.cogroup(rdd2) 會有shuffle,把相同的key聚合在同一機器會出現shuffle,返
回是 RDD[(String, (Iterable[Int], Iterable[Int]))],有點類似以全外連線,所
有的rdd1和rdd2中的所有資料都能顯示,String是key,第一個迭代器是對應rdd1中key的所有
value,第二個迭代器是對應rdd2中對應key的所有value,會出現如下這種結果
(jack,(CompactBuffer(3),CompactBuffer()))
(jerry,(CompactBuffer(),CompactBuffer(2,6)))

13、fm.mapPartitions(it=>it.map(_*2)),傳的引數是一個函式,函式的引數是迭代器,
返回是迭代器, 引數是每個分割槽中的資料是一個迭代器, 整個方法的返回值是 RDD[String],
跟map類似,但map是把資料一條一條的處理,mapPartitions 是把資料一個分割槽一個
分割槽的處理,把資料放到記憶體。在資料量小時,推薦使用mapPartitions,在資料量大時
可能會出現記憶體溢位oom

14、rdd.distinct()會發生shuffle,去重,返回型別RDD[Int]與rdd有關。
distinct底層呼叫的是ReduceByKey


15、rdd.coalesce(2,flag:Boolan), 設定分割槽數,有兩個引數,第一個是表示設定的分
區個數,第二是預設是false,表示不進行shuffle,返回值是RDD[(String,Int)],呼叫的
rdd的型別一樣。在不進行shuffle時,設定分割槽只能設定的小,不能設定大。把分割槽設定小
時會把 其中的某一個分割槽一下全分給另一個分割槽,比如一共有3分割槽,現在設定成2個,
會把其中的一個第3個分割槽中的資料全部給另2,而不是把第3個分割槽中的資料分散到分割槽1、2
上,不會發生shuffle。把分割槽設大時,一定會發生shuffle,會把其中分割槽中的部分資料拿
出來給新的分割槽,一定會出現shuffle。所以分割槽數預設是隻能設小,不能變大,如果設的大
了,還是按原來的分割槽 。想要變大,可以把引數設定成true,進行shuffle,預設是不
進行shuffle

16、rdd.rePartitions(分割槽數,Boolean) 方法可以調整分割槽數,預設會發生shuffle,
底層相當於呼叫了rdd.coalesce(true)方法,會發生Shuffle,
一般情況下,都需要把分割槽個數調大,會發生shuffle,把分割槽中的部分資料分給其他的新的
分割槽,所以會發生shuffle



17、val mpwi:[String] = rdd2.mapPartitionsWithIndex((index: Int, it: Iterator[(String, Int)]) => {it.map(e => s"Part : $index, ele : $e")
}) 
 獲取資料和分割槽號,index是分割槽號,it:Iterator 是一個分割槽的資料,該方法引數是一個
 函式,該方法的返回值是與iterator中的map方法一樣
還可以使用:,把partFunction當做引數傳到mapPartitions方法中





Action:
1、collect 無shuffle:返回的是Array型別,列印的話需要toBuffer
2、sum 無shuffle:返回值是double型別
3、reduce(_+_)  :可以用於替換sum, 返回值與呼叫的值有關
4、rdd.countByKey()有shuffle,計算相同的key出現的次數,與value無關,返回值是collection.Map[String, Long], String是指的是Key,Long指的是相同key出現的次數。底層實現是reduceByKey,
def countByKey(): Map[K, Long] = self.withScope {
  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
},把value都換成1,在求相同key 的和
5、count ,計數
6、take,可以用於求topN, 獲取指定的幾條資料
7、foreach
8、saveAsTextFile
9、foreachPartition