1. 程式人生 > 其它 >PySpark(3)RDD Transformations with examples

PySpark(3)RDD Transformations with examples

技術標籤:spark

1.RDD Transformations介紹:

RDD Transformations操作是在RDD上執行時的Spark操作,。它會導致一個或多個新RDD。由於RDD本質上是不可變的,因此轉換總是在不更新現有RDD的情況下
建立新的RDD,因此,這會建立RDD族譜。RDD譜系也稱為RDD運算子圖或RDD依賴圖。

RDD Transformations操作的兩個特點是,在呼叫Spark RDD上的action操作之前,不會執行任何Transformations操作;由於RDD是不可變的,因此對其進行
任何Transformations操作都會導致產生新的RDD,而當前的RDD保持不變;

2.RDD Transformation有兩種型別

(1)Narrow Transformation,稱為窄轉換,窄轉換是基於窄依賴(narrow dependencies)進行的RDD轉換。所謂窄依賴是指:父RDD的每個分割槽最多被兒子RDD的
一個分割槽使用。產生窄轉換的函式有:map,filter,distinct,union,基於分割槽的jion等。窄轉換的優點是高效,因為窄轉換通常可以在同一個節點上完成,省
去了叢集中節點之間的資料傳輸,並且由於父RDD的每個分割槽只會至多有一個子RDD的分割槽,在計運算元RDD的分割槽時,計算過程不會有任何浪費。
Functions such as map(), mapPartition(), flatMap(), filter(), union() are some examples of narrow transformation

(2)Wider Transformation,寬轉換是基於寬依賴(wide dependencies)進行的RDD轉換。所謂寬依賴是指:父RDD的每個分割槽都可能被子RDD的多個分割槽使用。也就
是說,計算單個分割槽中的記錄所需的資料可能存在父RDD的多個分割槽中。所以,寬轉換會發生shuffle過程,有時候把寬轉換也稱為:shuffle transformations。
由於父RDD的分割槽資料被多個子RDD分割槽依賴,這樣的話,在計算某個子RDD的分割槽時,需要計算父RDD的分割槽資料,但計算出來的父RDD的分割槽資料不會全部給子RDD使
用,也就造成了計算資源的浪費。導致寬轉換的函式有:groupByKey,reduceByKey等。

Functions such as groupByKey(), aggregateByKey(), aggregate(), join(), repartition() are some examples of a wider transformations.

總之,Wider Transformation由於會需要shuffle過程,要比Narrow Transformation使用的資源更多。

3.例子-對test.txt進行Count Word

spark = SparkSession.builder\
    .appName('SparkByExamples.com')\
    .master("local[3]")\
    .getOrCreate()

rdd = spark.sparkContext.textFile("test.txt")
  • flatMap()
# flatMap() Transformation : 將原rdd打平,返回一個新的rdd
# collect() : Return a list that contains all of the elements in this RDD.
rdd2 = rdd.flatMap(lambda x: x.split(' '))
print(rdd2.collect())
  • map()
# map() Transformation : 使用map()轉換進行任何複雜的操作,例如新增列,更新列等,對映轉換的輸出將始終具有與輸入相同的記錄數。
# 在我們的單詞計數示例中,我們將為每個單詞新增一個值為1的新列,RDD的結果為PairRDDFunctions,其中包含很多鍵值對
rdd3 = rdd2.map(lambda x: (x, 1))
print(rdd3.collect())
  • filter()
# filter() Transformation : 過濾RDD中的記錄
# 過濾所有以“ a”開頭的單詞。返回的rdd4中都是以a開頭的key
rdd4 = rdd3.filter(lambda x: x[0].startswith("a"))
print(rdd4.collect())
  • reduceByKey()
# reduceByKey() Transformation : 將按照key先進行分組,然後每個組中的每個鍵的值會按照給定的特定方法計算
from operator import add
rdd5 = rdd4.reduceByKey(add)
print(rdd5.collect())
  • sortByKey()
# sortByKey() Transformation : 用於對key上的RDD元素進行排序。
# 首先將rdd5中的key-value調換位置,然後排序
rdd6 = rdd5.map(lambda x: (x[1], x[0])).sortByKey()
  • foreach()
# Action foreach(function) : Applies a function to all elements of this RDD
rdd6.foreach(print)
  • repartition()
print("rdd6-partition count:", rdd6.getNumPartitions())
# repartition() : 設定rdd的分割槽數,該方法預設shuffle開啟了
reparRdd = rdd.repartition(4)
print("re-partition count:", reparRdd.getNumPartitions())

# Action - count() : Return the number of elements in this RDD.
print("Count : ", rdd6.count())

列印結果: