Spark核心類:彈性分散式資料集RDD及其轉換和操作pyspark.RDD
彈性分散式資料集RDD(Resilient Distributed Dataset)
術語定義
l彈性分散式資料集(RDD): Resillient Distributed Dataset,Spark的基本計算單元,可以通過一系列運算元進行操作(主要有Transformation和Action操作);
l有向無環圖(DAG):Directed Acycle graph,反應RDD之間的依賴關係;
l有向無環圖排程器(DAG Scheduler):根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler;
l任務排程器(Task Scheduler):將Taskset提交給worker(叢集)執行並回報結果;
l窄依賴(Narrow dependency):子RDD依賴於父RDD中固定的data partition;
l寬依賴(Wide Dependency):子RDD對父RDD中的所有data partition都有依賴。
RDD概念
RDD是Spark的最基本抽象,是對分散式記憶體的抽象使用,實現了以操作本地集合的方式來操作分散式資料集的抽象實現。RDD是Spark最核心的東西,它表示已被分割槽,不可變的並能夠被並行操作的資料集合,不同的資料集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到記憶體中,每次對RDD資料集的操作之後的結果,都可以存放到記憶體中,下一個操作可以直接從記憶體中輸入,省去了MapReduce大量的磁碟IO操作。這對於迭代運算比較常見的機器學習演算法, 互動式資料探勘來說,效率提升非常大。
RDD 最適合那種在資料集上的所有元素都執行相同操作的批處理式應用。在這種情況下, RDD 只需記錄血統中每個轉換就能還原丟失的資料分割槽,而無需記錄大量的資料操作日誌。所以 RDD 不適合那些需要非同步、細粒度更新狀態的應用 ,比如 Web 應用的儲存系統,或增量式的 Web 爬蟲等。對於這些應用,使用具有事務更新日誌和資料檢查點的資料庫系統更為高效。
RDD的特點
1.來源:一種是從持久儲存獲取資料,另一種是從其他RDD生成
2.只讀:狀態不可變,不能修改
3.分割槽:支援元素根據 Key 來分割槽 ( Partitioning ) ,儲存到多個結點上,還原時只會重新計算丟失分割槽的資料,而不會影響整個系統
4.路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的資訊關於它是如何從其他 RDD 產生而來的
5.持久化:可以控制儲存級別(記憶體、磁碟等)來進行持久化
6.操作:豐富的動作 ( Action ) ,如Count、Reduce、Collect和Save 等
RDD基礎資料型別
目前有兩種型別的基礎RDD:並行集合(Parallelized Collections):接收一個已經存在的Scala集合,然後進行各種平行計算。 Hadoop資料集(Hadoop Datasets):在一個檔案的每條記錄上執行函式。只要檔案系統是HDFS,或者hadoop支援的任意儲存系統即可。這兩種型別的RDD都可以通過相同的方式進行操作,從而獲得子RDD等一系列拓展,形成lineage血統關係圖。
1. 並行化集合
並行化集合是通過呼叫SparkContext的parallelize方法,在一個已經存在的Scala集合上建立的(一個Seq物件)。集合的物件將會被拷貝,創建出一個可以被並行操作的分散式資料集。例如,下面的直譯器輸出,演示瞭如何從一個數組建立一個並行集合。
例如:val rdd = sc.parallelize(Array(1 to 10)) 根據能啟動的executor的數量來進行切分多個slice,每一個slice啟動一個Task來進行處理。
val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的數量
2. Hadoop資料集
Spark可以將任何Hadoop所支援的儲存資源轉化成RDD,如本地檔案(需要網路檔案系統,所有的節點都必須能訪問到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支援文字檔案、SequenceFiles和任何Hadoop InputFormat格式。
(1)使用textFile()方法可以將本地檔案或HDFS檔案轉換成RDD
支援整個檔案目錄讀取,檔案可以是文字或者壓縮檔案(如gzip等,自動執行解壓縮並載入資料)。如textFile(”file:///dfs/data”)
支援萬用字元讀取,例如:
val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");
val rdd2=rdd1.map(_.split("t")).filter(_.length==6)
rdd2.count()
......
14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903
......
textFile()可選第二個引數slice,預設情況下為每一個block分配一個slice。使用者也可以通過slice指定更多的分片,但不能使用少於HDFS block的分片數。
(2)使用wholeTextFiles()讀取目錄裡面的小檔案,返回(使用者名稱、內容)對
(3)使用sequenceFile[K,V]()方法可以將SequenceFile轉換成RDD。SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value對而設計的一種平面檔案(Flat File)。
(4)使用SparkContext.hadoopRDD方法可以將其他任何Hadoop輸入型別轉化成RDD使用方法。一般來說,HadoopRDD中每一個HDFS block都成為一個RDD分割槽。
此外,通過Transformation可以將HadoopRDD等轉換成FilterRDD(依賴一個父RDD產生)和JoinedRDD(依賴所有父RDD)等。
Rdd的建立和操作
建立RDD
並行集合 (Parallelized collections)
RDD可從現有的集合建立。比如在Scala shell中:
val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)
並行集合 (Parallelized collections) 的建立是通過在一個已有的集合(Scala Seq)上呼叫 SparkContext 的 parallelize 方法實現的。集合中的元素被複制到一個可並行操作的分散式資料集中。並行集合一個很重要的引數是切片數(slices),表示一個數據集切分的份數。Spark 會在叢集上為每一個切片執行一個任務。你可以在叢集上為每個 CPU 設定 2-4 個切片(slices)。正常情況下,Spark 會試著基於你的叢集狀況自動地設定切片的數目。然而,你也可以通過 parallelize 的第二個引數手動地設定(例如:sc.parallelize(data, 10))。
外部資料集
比如Spark 可以從任何一個 Hadoop 支援的儲存源建立分散式資料集,包括你的本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支援文字檔案(text files),SequenceFiles 和其他 Hadoop InputFormat。
用一個本地檔案系統裡的檔案建立RDD:
val rddFromTextFile = sc.textFile("LICENSE")
[外部資料集]
序列化及其反序列化
saveAsPickleFile(path, batchSize=10)
Save this RDD as a SequenceFile of serialized objects. The serializerused is pyspark.serializers.PickleSerializer, default batch sizeis 10.
>>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect()) ['1', '2', 'rdd', 'spark'][pickleFile(name, minPartitions=None)¶]
Spark操作
在Spark程式設計模式下,所有的操作被分為轉換(transformation)和執行(action)兩種。
一般來說,轉換操作是對一個數據集裡的所有記錄執行某種函式,從而使記錄發生改變;而執行通常是執行某些計算或聚合操作,並將結果返回執行 SparkContext 的那個驅動程式。
Spark中的轉換操作是延後的,也就是說,在RDD上呼叫一個轉換操作並不會立即觸發相應的計算。相反,這些轉換操作會連結起來,並只在有執行操作被呼叫時才被高效地計算。這樣,大部分操作可以在叢集上並行執行,只有必要時才計算結果並將其返回給驅動程式(如執行操作count),從而提高了Spark的效率。
例如,map 是一個轉換操作,它將每一個數據集元素傳遞給一個函式並且返回一個新的 RDD。另一方面,reduce 是一個動作,它使用相同的函式來聚合 RDD 的所有元素,並且將最終的結果返回到驅動程式(不過也有一個並行 reduceByKey 能返回一個分散式資料集)。例如,我們可以實現:通過 map 建立一個新資料集在 reduce 中使用,並且僅僅返回 reduce 的結果給 driver,而不是整個大的對映過的資料集。
轉換操作
map(func) | 返回一個新的分散式資料集,由每個原元素經過func函式轉換後組成 |
filter(func) | 返回一個新的資料集,由經過func函式後返回值為true的原元素組成 |
flatMap(func) | 類似於map,但是每一個輸入元素,會被對映為0到多個輸出元素(因此,func函式的返回值是一個Seq,而不是單一元素) |
sample(withReplacement, frac, seed) | 根據給定的隨機種子seed,隨機抽樣出數量為frac的資料 |
union(otherDataset) | 返回一個新的資料集,由原資料集和引數聯合而成 |
groupByKey([numTasks]) | 在一個由(K,V)對組成的資料集上呼叫,返回一個(K,Seq[V])對的資料集。注意:預設情況下,使用8個並行任務進行分組,你可以傳入numTask可選引數,根據資料量設定不同數目的Task |
reduceByKey(func, [numTasks]) | 在一個(K,V)對的資料集上使用,返回一個(K,V)對的資料集,key相同的值,都被使用指定的reduce函式聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選引數來配置的。 |
join(otherDataset, [numTasks]) | 在型別為(K,V)和(K,W)型別的資料集上呼叫,返回一個(K,(V,W))對,每個key中的所有元素都在一起的資料集 |
groupWith(otherDataset, [numTasks]) | 在型別為(K,V)和(K,W)型別的資料集上呼叫,返回一個數據集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup |
cartesian(otherDataset) | 笛卡爾積。但在資料集T和U上呼叫時,返回一個(T,U)對的資料集,所有元素互動進行笛卡爾積。 |
執行操作
reduce(func) | 通過函式func聚集資料集中的所有元素。Func函式接受2個引數,返回一個值。這個函式必須是關聯性的,確保可以被正確的併發執行 |
collect() | 在Driver的程式中,以陣列的形式,返回資料集的所有元素。這通常會在使用filter或者其它操作後,返回一個足夠小的資料子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程式OOM。返回所有元素到驅動程式 |
count() | 返回資料集的元素個數 |
take(n) | 返回一個數組,由資料集的前n個元素組成。注意,這個操作目前並非在多個節點上,並行執行,而是Driver程式所在機器,單機計算所有的元素(Gateway的記憶體壓力會增大,需要謹慎使用)。返回前k個元素到驅動程式 |
first() | 返回資料集的第一個元素(類似於take(1);也就是返回第1個元素到驅動程式 |
saveAsTextFile(path) | 將資料集的元素,以textfile的形式,儲存到本地檔案系統,hdfs或者任何其它hadoop支援的檔案系統。Spark將會呼叫每個元素的toString方法,並將它轉換為檔案中的一行文字 |
saveAsSequenceFile(path) | 將資料集的元素,以sequencefile的格式,儲存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支援的檔案系統。RDD的元素必須由key-value對組成,並都實現了Hadoop的Writable介面,或隱式可以轉換為Writable(Spark包括了基本型別的轉換,例如Int,Double,String等等) |
foreach(func) | 在資料集的每一個元素上,執行函式func。這通常用於更新一個累加器變數,或者和外部儲存系統做互動 |
注意事項
通常只在需將結果返回到驅動程式所在節點以供本地處理時,才呼叫 collect 函式。
注意, collect 函式一般僅在的確需要將整個結果集返回驅動程式並進行後續處理時才有必要呼叫。如果在一個非常大的資料集上呼叫該函式,可能耗盡驅動程式的可用記憶體,進而導致程式崩潰。
高負荷的處理應儘可能地在整個叢集上進行,從而避免驅動程式成為系統瓶頸。然而在不少情況下,將結果收集到驅動程式的確是有必要的。很多機器學習演算法的迭代過程便屬於這類情況。
[RDD支援的轉換和執行操作的完整列表以及更為詳細的例子
以及Spark API(Scala)文件]
RDD快取策略
預設情況下,每一個轉換過的 RDD 會在每次執行動作(action)的時候重新計算一次。
然而,你能通過persist()或者cache()方法持久化一個rdd。首先,在action中計算得到rdd;然後,將其儲存在每個節點的記憶體中。在這個情況下,Spark 會在叢集上儲存相關的元素,在你下次查詢的時候會變得更快。在這裡也同樣支援持久化 RDD 到磁碟,或在多個節點間複製。
Spark的快取是一個容錯的技術-如果RDD的任何一個分割槽丟失,它可以通過原有的轉換(transformations)操作自動的重複計算並且創建出這個分割槽。
呼叫一個RDD的 cache 函式將會告訴Spark將這個RDD快取在記憶體中。在RDD首次呼叫一個執行操作時,這個操作對應的計算會立即執行,資料會從資料來源裡讀出並儲存到記憶體。因此,首次呼叫 cache 函式所需要的時間會部分取決於Spark從輸入源讀取資料所需要的時間。但是,當下一次訪問該資料集的時候,資料可以直接從記憶體中讀出從而減少低效的I/O操作,加快計算。多數情況下,這會取得數倍的速度提升。
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
如果我們想要再次使用 lineLengths
,我們可以新增:
lineLengths.persist()
我們可以利用不同的儲存級別儲存每一個被持久化的RDD。例如,它允許我們持久化集合到磁碟上、將集合作為序列化的Java物件持久化到記憶體中、在節點間複製集合或者儲存集合到Tachyon中。我們可以通過傳遞一個StorageLevel物件給persist()方法設定這些儲存級別。cache()方法使用了預設的儲存級別—StorageLevel.MEMORY_ONLY。
刪除資料
Spark自動的監控每個節點快取的使用情況,利用最近最少使用原則刪除老舊的資料。如果你想手動的刪除RDD,可以使用RDD.unpersist()方法轉換函式詳解
map(f, preservesPartitioning=False)
Return a new RDD by applying a function to each element of this RDD.
>>> rdd = sc.parallelize(["b", "a", "c"]) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)]
- map接收的函式最好是
def mapper(value): ... return key, value
map 函式,我們將每一個字串都轉換為一個整數,從而返回一個由若干 Int 構成的RDD物件。
val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
flatMap(f, preservesPartitioning=False)
Return a new RDD by first applying a function to all elements of thisRDD, and then flattening the results.
>>> rdd = sc.parallelize([2, 3, 4]) >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) [1, 1, 1, 2, 2, 3] >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
Note: use flatMap if you want a map function that returns multiple outputs.The function passed to flatMap can return an iterable.
這樣的話map函式可以返回一個可迭代物件,相當於將返回值變成了多個rdd行(本來返回一行rdd,但是卻是多個(k, v)組合而不是一個可供reducebykey使用的單個(k, v))?
def mapper(value):
...
result_list = []
for key, value in some_list:
result_list.append( key, value )
return result_list
mapValues(f)
Pass each value in the key-value pair RDD through a map functionwithout changing the keys; this also retains the original RDD’spartitioning.
>>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) >>> def f(x): return len(x) >>> x.mapValues(f).collect() [('a', 3), ('b', 1)]
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash>)
Generic function to combine the elements for each key using a customset of aggregation functions.Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combinedtype” C. Note that V and C can be different – for example, one mightgroup an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
Users provide three functions:
- createCombiner, which turns a V into a C (e.g., createsa one-element list)
- mergeValue, to merge a V into a C (e.g., adds it to the end ofa list)
- mergeCombiners, to combine two C’s into a single one.
In addition, users can control the partitioning of the output RDD.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> def add(a, b): return a + str(b) >>> sorted(x.combineByKey(str, add, add).collect()) [('a', '11'), ('b', '1')]
[combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7ff5681b9d70>)¶]
reduce(f)
Reduces the elements of this RDD using the specified commutative andassociative binary operator. Currently reduces partitions locally.
>>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 15 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 10
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash at 0x7ff5681b9d70>)
Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
Output will be partitioned with numPartitions partitions, orthe default parallelism level if numPartitions is not specified.Default partitioner is hash-partition.
>>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)]
filter(f)
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5]) >>> rdd.filter(lambda x: x % 2 == 0).collect() [2, 4]
collect()
Return a list that contains all of the elements in this RDD.Note that this method should only be used if the resulting array is expectedto be small, as all the data is loaded into the driver’s memory.
collectAsMap()
Return the key-value pairs in this RDD to the master as a dictionary.
Note that this method should only be used if the resulting data is expectedto be small, as all the data is loaded into the driver’s memory.
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() >>> m[1] 2
ref: [class pyspark.RDD(jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))¶]