總結常用的Transformation運算元和Action運算元,及基本用法
阿新 • • 發佈:2018-11-11
只有是Action時,才會執行立即操作。 Transformation是 lazy的操作,不會立即執行,執行Transformation的運算元時,會返回一個新的RDD,依賴上一個RDD
transformation: 1、sortBy : 對於RDD的是非元組型,rdd1.soreBy(x=>x),元組型按value排序 rdd.soreBy(_._2)(進行shuffle) 2、sortByKey : 對於RDD記憶體的是元組型別的使用(進行shuffle) ,sortBy和 sortByKey都是transformation運算元,但都會觸發job任務,底層new了一個rangePartition物件,底層會呼叫一個foreach方法,進行排序,所以會觸發job 3、reduceByKey(_+_) : 先在分割槽內進行按key合併,在全域性合併(全域性是把相同key的 不同的分割槽,拉倒同一個分割槽(有多個分割槽))會進行shuffle 4、filter: 過濾, 不會進行shuffle 5、flatMap: 方法,可以先切分,再壓平, 不會進行shuffle 6、 rdd1.intersection(rdd2), 求兩個集合的交集 7.0、 rdd1.union(rdd2),返回rdd1和rdd2中的所有元素,返回型別是Array,不Shuffle 7、rdd1.join(rdd2), 返回結果RDD[(String, (Int, Int))],join是內連線, 只有相同的才會join,會有shuffle過程 8、 rdd1.leftOuterJoin(rdd2),返回結果是RDD[(String, (Int, Option[Int]))],如果有rdd2中有和rdd1對應的資料時Some(value值),沒有值None,第一個Int是rdd1中的value,rdd1中的資料會顯示完,有shuffle 9、rdd1.rightOuterJoin(rdd2),返回結果是RDD[(String, (Option[Int], Int))], String是key型別, Option[Int]表示和rdd2中對應的沒有,就和None, 有和rdd2中的資料對應的就是Some(value),rdd2中的資料會顯示完,有shuffle 10、rdd.groupByKey(), 按key進行聚合,把不同分割槽的相同的key拉倒同一臺機器上,返回值是 RDD[(String, Iterable[Int])],String 是key的型別,迭代器是相同key的value的,如果想要聚合 ,可以呼叫 map(x=>(x._1,x._2.sum))方法,相當於reduceBykey.會進行shuffle, 10.1 rdd.groupBy() 通過指定的值進行聚合,返回的值是 RDD[(String, Iterable[(String, Int)])] ,迭代器中的型別是整個KV的型別 11、rdd.reduceByKey(_+_),返回值RDD[(String, Int)],在分割槽內按key進行聚合 (Combiner),再把不同分割槽的相同的key的資料拉倒同一機器上進行聚合,會進行shuffle, reduceByKey會進行Combiner,所以比groupByKey效率高,一般用reduceByKey 12、rdd1.cogroup(rdd2) 會有shuffle,把相同的key聚合在同一機器會出現shuffle,返 回是 RDD[(String, (Iterable[Int], Iterable[Int]))],有點類似以全外連線,所 有的rdd1和rdd2中的所有資料都能顯示,String是key,第一個迭代器是對應rdd1中key的所有 value,第二個迭代器是對應rdd2中對應key的所有value,會出現如下這種結果 (jack,(CompactBuffer(3),CompactBuffer())) (jerry,(CompactBuffer(),CompactBuffer(2,6))) 13、fm.mapPartitions(it=>it.map(_*2)),傳的引數是一個函式,函式的引數是迭代器, 返回是迭代器, 引數是每個分割槽中的資料是一個迭代器, 整個方法的返回值是 RDD[String], 跟map類似,但map是把資料一條一條的處理,mapPartitions 是把資料一個分割槽一個 分割槽的處理,把資料放到記憶體。在資料量小時,推薦使用mapPartitions,在資料量大時 可能會出現記憶體溢位oom 14、rdd.distinct()會發生shuffle,去重,返回型別RDD[Int]與rdd有關。 distinct底層呼叫的是ReduceByKey 15、rdd.coalesce(2,flag:Boolan), 設定分割槽數,有兩個引數,第一個是表示設定的分 區個數,第二是預設是false,表示不進行shuffle,返回值是RDD[(String,Int)],呼叫的 rdd的型別一樣。在不進行shuffle時,設定分割槽只能設定的小,不能設定大。把分割槽設定小 時會把 其中的某一個分割槽一下全分給另一個分割槽,比如一共有3分割槽,現在設定成2個, 會把其中的一個第3個分割槽中的資料全部給另2,而不是把第3個分割槽中的資料分散到分割槽1、2 上,不會發生shuffle。把分割槽設大時,一定會發生shuffle,會把其中分割槽中的部分資料拿 出來給新的分割槽,一定會出現shuffle。所以分割槽數預設是隻能設小,不能變大,如果設的大 了,還是按原來的分割槽 。想要變大,可以把引數設定成true,進行shuffle,預設是不 進行shuffle 16、rdd.rePartitions(分割槽數,Boolean) 方法可以調整分割槽數,預設會發生shuffle, 底層相當於呼叫了rdd.coalesce(true)方法,會發生Shuffle, 一般情況下,都需要把分割槽個數調大,會發生shuffle,把分割槽中的部分資料分給其他的新的 分割槽,所以會發生shuffle 17、val mpwi:[String] = rdd2.mapPartitionsWithIndex((index: Int, it: Iterator[(String, Int)]) => {it.map(e => s"Part : $index, ele : $e") }) 獲取資料和分割槽號,index是分割槽號,it:Iterator 是一個分割槽的資料,該方法引數是一個 函式,該方法的返回值是與iterator中的map方法一樣 還可以使用:,把partFunction當做引數傳到mapPartitions方法中 Action: 1、collect 無shuffle:返回的是Array型別,列印的話需要toBuffer 2、sum 無shuffle:返回值是double型別 3、reduce(_+_) :可以用於替換sum, 返回值與呼叫的值有關 4、rdd.countByKey()有shuffle,計算相同的key出現的次數,與value無關,返回值是collection.Map[String, Long], String是指的是Key,Long指的是相同key出現的次數。底層實現是reduceByKey, def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap },把value都換成1,在求相同key 的和 5、count ,計數 6、take,可以用於求topN, 獲取指定的幾條資料 7、foreach 8、saveAsTextFile 9、foreachPartition