1. 程式人生 > >Spark中Transformations、Actions

Spark中Transformations、Actions

 

解釋narrow transformation和wide transformation的區別
掌握map flatmap filter coalesce
列舉兩種wide transformation
列舉Spark pipeline中的4種常見action
Transformations
narrow transformation只在worker node 本地執行操作,不需要重排(shuffle),因而不需要將進行網路資料傳輸。如下面的map、faltMap、filter和coalesce。
map()
map()會將一個函式應用到RDD中的每一個element或partition,是一種one to one transformation。

def lower(line):
return line.lower()
lower_text_RDD = text_RDD.map(lower)
下圖中橘黃色代表worker node,黑色代表partition,每個partition中包含若干element。Spark 以partition為工作單位而非element,這也是Spark和MapReduce的區別之一。每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。另外,每個worker node將map應用到它們收到的RDD partition上。


flatMap()
flatMap()會先執行map然後flatten(扁平化)輸出。map變換對每一條輸入進行指定的操作,然後為每一條輸入返回一個物件,flatten將所有物件合併為一個物件。
flatMap()會生成一個新的RDD,並且原RDD的RDD partition與新的RDD partition一一對應。
split_words函式以行作為輸入,element是行,單詞作為輸出,element是單詞。由於每行含有的單詞數不盡相同,右邊的黑盒有的厚有的薄。

對每個Partition執行完split_words後,獲得words的一維列表,所有單詞都只在一個列表中。
def split_words(line):
return line.split()

words_RDD = text_RDD.flatMap(split_words)
words_RDD.collect()

filter()和coalesce()
starts_with_a()是過濾器中的一種,該函式返回布林真值。如果結果為true,filter會將這個word保留。filter的結果就是過濾後的word組成的列表。
def starts_with_a(word):
return word.lower().startwith("a")
words_RDD.filter(starts_with_a).collect()


filter()過濾後生成的RDD Partition 往往長短不一,這時需要將部分partition合併以提高效能,即coalesce()。coalesce將較小partition的數量。

wide transformation 需要將shuffle資料,所以需要網路傳輸。如groupByKey()和reduceByKey()。
groupByKey()
groupByKey把element按key進行分組,如下圖,將key相同的鍵值對合並,value變為原value組成的列表。

reduceByKey()
reduceByKey() = groupByKey() + sum(),其中sum()是一種reduce。將groupByKey()結果中的列表相加就是reduceByKey()。

Actions
actions是執行流水線,將最終結果返回到Driver Program,將結果儲存到外存等一系列事件的觸發器,也是Spark pipeline的最後一步。
collect()
collect會將所有worker node上的結果收集到Driver Program上,然後會傳送到我們的python shell上。
take(n)
take(n)會將結果的前n個element拷貝出來。
reduce(func)
使用func將element聚合
saveAsTextFile(filename)
將結果以文字形式儲存到本地或HDFS
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Transformations

Transformation 描述
map(func) 通過應用一個函式的所有元素,返回一個新的分散式資料集
filter(func) 通過選擇函式返回true的那些元素來形成,返回一個新的資料集
flatMap(func) 與map類似,但每個輸入項都可以對映到0個或多個輸出項(因此函式應該返回一個序列而不是單個項)
mapPartitions(func) 與map類似,但執行在RDD的每個分割槽(塊),所以函式必須是迭代器Iterator => Iterator,當執行在一個型別T的RDD
mapPartitionsWithIndex(func) 與mapPartitions類似,但也為函式提供一個表示分割槽索引的整數值,所以函式必須是(Int, Iterator) => Iterator型別當執行在一個型別T的RDD
sample(withReplacement, fraction, seed) 使用給定隨機數字生成器的種子,對資料的一小部分進行取樣,無論有無替換
union(otherDataset) 返回一個新的資料集,其中包含源資料集和引數中的元素的聯合
intersection(otherDataset) 返回一個新的RDD,它包含源資料集和引數中的元素的交集
distinct([numTasks])) 返回一個包含源資料集的不同元素的新資料集
groupByKey([numTasks]) 當呼叫一個(K, V)對的資料集時,返回一個(K, 可迭代的)對的資料集。注意:如果要對每個鍵執行聚合(如彙總或平均值),使用簡化的方法或聚合鍵將會獲得更好的效能。注意:在預設情況下,輸出的並行度取決於父RDD分割槽的數量,你可以通過一個可選的numTasks引數來設定不同數量的任務。
reduceByKey(func, [numTasks]) 當呼叫(K, V)的資料集對,返回一個數據集(K, V)對每個鍵的值在哪裡聚合使用給定減少函式func,必須(V, V) => V形似groupByKey,減少任務的數量通過一個可選的第二個引數是可配置的。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 當呼叫一個(K, V)對的資料集時,返回一個(K, U)對的資料集,其中每個鍵的值使用給定的組合函式和一箇中立的“零”值進行聚合。允許一個與輸入值型別不同的聚合值型別,同時避免不必要的分配。與groupByKey一樣,reduce任務的數量是通過可選的第二個引數進行配置的。
sortByKey([ascending], [numTasks]) 當呼叫一個(K, V)對K實現排序的資料集時,返回一個由按升序或降序排列的(K, V)對的資料集,如布林提升引數中所指定的那樣。
join(otherDataset, [numTasks]) 當呼叫型別的資料集(K,V)和(K,W)時,返回一個數據集(K,(V,W))對每一個鍵的所有對元素。外部連線通過左外連線、右外連線與全外連線。
cogroup(otherDataset, [numTasks]) 當呼叫型別的資料集(K,V)和(K,W)時,返回一個 (K, (Iterable, Iterable)) 元素的資料集。這個操作也稱為group分組。
cartesian(otherDataset) 當呼叫型別T和U的資料集時,返回一個(T,U)對(所有對元素)的資料集。
pipe(command, [envVars]) 通過shell命令對RDD的每個分割槽進行管道,例如Perl或bash指令碼。RDD元素被寫入到程序的stdin中,輸出到其stdout的輸出被作為字串的RDD返回。
coalesce(numPartitions) 將RDD中的分割槽數量減少到num分割槽。在過濾大資料集之後,可以更有效地執行操作。
repartition(numPartitions) 對RDD中的資料進行隨機重組,以建立多個或更少的分割槽,並在它們之間進行平衡。這通常會使網路上的所有資料都被打亂。
repartitionAndSortWithinPartitions(partitioner) 根據給定的分割槽重新分割槽RDD,在每個結果分割槽中,根據它們的鍵對記錄進行排序。這比呼叫重新分割槽更有效,然後在每個分割槽中進行排序,因為它可以將排序推入到洗牌機器中。

Actions

Action 描述
reduce(func) 使用函式func聚合資料集的元素(它需要兩個引數並返回一個引數)。這個函式應該是可交換的和結合的,這樣它就可以在平行計算中得到正確的計算。
collect() 將資料集的所有元素作為驅動程式的陣列返回。這通常是在過濾器或其他操作之後才會有用的,這些操作返回一個足夠小的資料子集。
count() 返回資料集中的元素數量。
first() 返回資料集的第一個元素(類似於take(1))。
take(n) 返回一個帶有資料集的第n個元素的陣列。
takeSample(withReplacement, num, [seed]) 返回一個包含資料集的num元素隨機樣本的陣列,不管有沒有替換,都可以選擇預先指定一個隨機數生成器種子。
takeOrdered(n, [ordering]) 返回RDD的前n個元素,使用它們的自然順序或自定義比較器。
saveAsTextFile(path) 在本地檔案系統、HDFS或任何其他hadoop支援的檔案系統中,將資料集的元素作為文字檔案(或一組文字檔案)寫入一個給定目錄中。Spark將呼叫每個元素的toString,將其轉換為檔案中的一行文字。
saveAsSequenceFile(path) (Java and Scala) 在本地檔案系統、HDFS或任何其他Hadoop支援的檔案系統中,將資料集的元素作為Hadoop序列檔案寫入給定路徑。這可用於實現Hadoop可寫介面的鍵-值對的RDDs。在Scala中,它還可以在可隱式可轉換的型別中使用(Spark包含諸如Int、Double、String等基本型別的轉換)。
saveAsObjectFile(path) (Java and Scala) 使用Java序列化以簡單的格式編寫資料集的元素,然後可以使用sparkcontext.objectfile()裝載資料。
countByKey() 只有在型別的rdd(K,V)上才可用。返回一個hashmap(K,Int)對每個鍵的計數。
foreach(func) 在資料集的每個元素上執行一個函式func。這通常是為了一些副作用,比如更新一個累加器或者與外部儲存系統進行互動。注意:除了foreach()之外的累計變數之外,修改變數可能導致未定義的行為。請參閱理解閉包,瞭解更多細節。