spark中常用運算元含義及區別
阿新 • • 發佈:2018-12-21
Transform:
1.
map:rdd中的每項資料進行map裡的操作後,會形成一個個新的元素的新rdd
flatMap:在map的基礎上進行扁平化,形成一個新的rdd
2.
distinct:轉換操作,去重
filter:對rdd中的元素進行過濾
filterByRange:範圍過濾,作用於鍵值對RDD,對RDD中元素進行過濾,返回鍵在指定範圍內的元素
3.
union:並集操作,不去重
intersection:交集操作,去重
subtract:類似intersection,返回在rdd中出現並且不在otherRdd中出現的元素
subtractByKey:與subtract類似,只不過這裡是針對key的,返回在主RDD中出現並且不在otherRDD中出現的元素
4.
join:相當於sql中的內聯,只返回兩個RDD根據key關聯上的結果
leftOuterJoin:相當於sql中的左外關聯,返回結果以前面的RDD為主,關聯不上的記錄為空
rightOuterJoin:相當於sql中的右外關聯,返回結果以後面的RDD為主,關聯不上的記錄為空
cartesian:做笛卡爾積
5.
cogroup:將輸入的資料集(k,v)和另外的資料集(k,w)進行cogroup,得到的資料集是(k,Seq(v),Seq(w))的資料集
groupBy:接收一個函式,這個函式的返回值作為key,然後通過key對裡面的元素進行分組
groupByKey:會對每一個RDD中的value聚合成一個序列,此操作發生在reduce端,所以勢必所有的資料
將會通過網路傳輸,造成不必要的浪費,同時如果資料量十分大,可能還會造成OutOfMemoryError
6.
reduceByKey:會在結果傳送至reduce之前,會對每個mapper在本地進行merge,有點類似於MapReduce的combiner。
這樣做的好處是在map端進行一次reduce之後,資料量會大幅度減小,從而減小傳輸,保證reduce端能夠更快的計算出結果
reduceByKeyLocally:對RDD中的每個k對應的v值根據對映函式來計算,運算結果對映到Map[k,v],而不是RDD[k,v]
7.
mapPartitions:與map函式類似,只不過對映函式的引數由RDD的每一個元素變成了RDD中每一個分割槽的迭代(如果在對映過程中
需要頻繁建立額外的物件,使用mapPartitions要比map高效,比如將RDD中所有資料通過JDBC寫入資料庫,如果使用map,
可能每個元素都要建立一個connection,這樣開銷很大,如果使用mapPartitions,那麼只需要針對每一個分割槽建立一個connection)
mapPartitionsWithIndex:函式作用同mapPartitions,不過提供了兩個引數,第一個引數為翻去的索引。
8.
zip:用於兩個RDD組合成Key/Value形式的RDD,預設兩個RDD的partition的數量以及元素的數量相同,否則會丟擲異常
zipPartitions:將多個RDD按照partition組合成新的RDD,該函式需要組合的RDD具有相同的分割槽數,但對於每個分割槽內的元素數量沒有要求(三個引數,大致三類實現)
9.
zipWithIndex:將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵值對
zipWithUniqueId:將RDD中的元素和一個唯一ID組合成鍵值對(該唯一ID生成演算法:
每個分割槽中第一個元素的唯一ID值為:該分割槽索引號,
每個分割槽中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分割槽數))
10.
randomSplit:根據weights權重,將一個RDD切分成多個RDD,權重引數是一個Double陣列
11.
glom:將RDD中每一個分割槽中型別為T的元素轉換成Array[T],這樣每個分割槽就只有一個數組元素
12.
coalesce:用於將RDD重新分割槽,使用HashPartitioner,第一個為重新分割槽的數目,第二個為時候進行shuffle,預設是false
repartition:該函式是coalesce函式的第二個引數為true的實現
13.
combineByKey:用於將RDD[K,V]轉換成RDD[K,C],V和C的型別可相同可不同
第一個引數x:原封不動的取出來,第二個引數是函式:區域性運算,第三個引數是函式:對區域性運算後的結果在做運算
每個分割槽中每個key的value中的第一個值:(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就相當於hello的第一個1, good中的1
引數(createCombiner:組合器函式,用於將V型別轉換成C型別,輸入引數為RDD[K,V]中的V,輸出為C
mergeValue:合併值函式,將一個C型別和一個V型別值合併成一個C型別,輸入引數為(C,V),輸出為C
mergeCombiners:合併組合器函式,用於將兩個C型別值合併成一個C型別,輸入引數為(C,C),輸出為C
numPartitions:結果RDD分割槽數,預設保持原有的分割槽數
partitioner:分割槽函式,預設為HashPartitioner
mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,預設為true)
foldByKey:該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V,
進行初始化V,再將對映函式應用於初始化後的V.
aggregateByKey:aggregate針對於序列操作,aggregateByKey針對於K,V操作,可以先進行區域性操作,在進行全域性操作
從原始碼可看出,aggregateByKey呼叫的就是combinByKey方法
14.
partitionBy:根據partitioner函式生成新的ShuffleRDD,將原RDD重新分割槽
15.
mapValues:同map,只不過mapValues針對的是[K,V]中的V進行map操作
flatMapValues:同flatMap,只不過flatMapValues是針對[K,V]中的V值進行flatMap操作,根據V去扁平化
16.
foreachRDD:sparkStreaming中的轉換運算元,處理每一個時間段內的RDD資料。
Action:
1.
sortBy:排序,有shuffle,預設是true升序,可以按照k或者v進行排序
sortByKey:排序,有shuffle,預設是true升序,按照k進行排序
2.
aggregate:使用者聚合RDD中的元素,先使用seqOp將RDD中每個分割槽的T型別元素聚合成U型別,再使用combOp
將之前每個分割槽聚合後的U型別聚合成U型別,特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的型別為U
fold:是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函式Op
3.
lookup:用於(K,V)型別的RDD,指定K值,返回RDD中該K對應的所有V值
4.
collectAsMap:List集合轉成Map集合(List(("a", 1), ("b", 2))=>Map(b -> 2, a -> 1))
5.
countByKey:計算key的數量
countByValue:計算value的數量
6.
froeach:foreach也是對每個partition中的iterator時行迭代處理,通過使用者傳入的function(即函式f)對iterator進行內容的處理,
而不同的是,函式f中的引數傳入的不再是一個迭代器,而是每次的foreach得到的一個rdd的kv例項,也就是具體的資料
foreachPartition:是對每個partition中的iterator時行迭代的處理.通過使用者傳入的function(即函式f)對iterator進行內容的處理,
原始碼中函式f傳入的引數是一個迭代器,也就是說在foreachPartiton中函式處理的是分割槽迭代器,而非具體的資料,不會生成一個新的RDD
7.
keys:RDD[K,V],列印key
values:RDD[K,V],列印value
8.
keyBy:以傳入的引數作為key(以單詞的第一個字母:rdd19.keyBy(_ (0)).collect() => ArrayBuffer((d,dog), (s,salmon))
9.
count:返回RDD中的元素數量。
reduce:根據對映函式f,對RDD中的元素進行二元計算,返回計算結果。
collect:用於將一個RDD轉換成陣列。
10.
first:返回RDD中的第一個元素,不排序。
take:用於獲取RDD中從0到num-1下標的元素,不排序。
top:用於從RDD中,按照預設(降序)或者指定的排序規則,返回前num個元素。
takeOrdered:和top類似,只不過以和top相反的順序返回元素。
11.
saveAsTextFile:用於將RDD以文字檔案的格式儲存到檔案系統中,只會儲存在Executor所在機器的本地目錄。
saveAsSequenceFile:用於將RDD以SequenceFile的檔案格式儲存到HDFS上.
saveAsObjectFile:用於將RDD中的元素序列化成物件,儲存到檔案中。對於HDFS,預設採用SequenceFile儲存。
12.
saveAsHadoopFile:將RDD儲存在HDFS上的檔案中,支援老版本Hadoop API,可以指定outputKeyClass、outputValueClass以及壓縮格式。
saveAsHadoopDataset:用於將RDD儲存到除了HDFS的其他儲存中,比如HBase。
(在JobConf中,通常需要關注或者設定五個引數:檔案的儲存路徑、key值的class型別、value值的class型別、RDD的輸出格式(OutputFormat)、以及壓縮相關的引數。)
13.
saveAsNewAPIHadoopFile:用於將RDD資料儲存到HDFS上,使用新版本Hadoop API。
saveAsNewAPIHadoopDataset:採用新版本Hadoop API。