Spark快速大數據分析之RDD基礎
-
Spark 中的RDD 就是一個不可變的分布式對象集合。每個RDD 都被分為多個分區,這些分區運行在集群中的不同節點上。RDD 可以包含Python、Java、Scala中任意類型的對象,甚至可以包含用戶自定義的對象。
-
用戶可以使用兩種方法創建RDD:讀取一個外部數據集,或在驅動器程序裏分發驅動器程序中的對象集合(比如list 和set)。
-
RDD支持兩種類型的操作:轉化操作和行動操作。轉化操作會由一個RDD 生成一個新的RDD。行動操作會對RDD計算出一個結果,並把結果返回到驅動器程序中,或把結果存儲到外部存儲系統(如HDFS)中。
-
轉化操作和行動操作的區別在於Spark 計算RDD 的方式不同。
-
默認情況下,Spark的RDD會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個RDD,可以使用RDD.persist()讓Spark 把這個RDD 緩存下來。在第一次對持久化的RDD 計算之後,Spark 會把RDD 的內容保存到內存中(以分區方式存儲到集群中的各機器上),這樣在之後的行動操作中,就可以重用這些數據了。
-
cache() 與使用默認存儲級別調用persist() 是一樣的。
總的來說,每個Spark 程序或shell 會話都按如下方式工作。
1. 從外部數據創建出輸入RDD。
2. 使用諸如filter() 這樣的轉化操作對RDD 進行轉化,以定義新的RDD。
4. 使用行動操作(例如count()和first()等)來觸發一次並行計算,Spark 會對計算進行優化後再執行。
創建RDD
Spark 提供了兩種創建RDD的方式:讀取外部數據集,以及在驅動器程序中對一個集合進行並行化。
創建RDD 最簡單的方式就是把程序中一個已有的集合傳給SparkContext
的parallelize()
方法。但除了開發原型和測試時,這種方式用得並不多,畢竟這種方式需要把你的整個數據集先放在一臺機器的內存中。更常用的方式是從外部存儲中讀取數據來創建RDD。如: val lines = sc.textFile("/path/to/README.md")
RDD操作
RDD 支持兩種操作:轉化操作和行動操作。RDD 的轉化操作是返回一
個新的RDD 的操作,比如map()
和filter()
,而行動操作則是向驅動器程序返回結果或把結果寫入外部系統的操作,會觸發實際的計算,比如count()
和first()
。它們的返回值類型:轉化操作返回的是RDD,而行動操作返回的是其他的數據類型。
轉化操作
RDD的轉化操作是返回新RDD的操作。轉化出來的RDD是惰性求值的,只有在行動操作中用到這些RDD 時才會被計算。許多轉化操作都是針對各個元
素的,也就是說,這些轉化操作每次只會操作RDD 中的一個元素。不過並不是所有的轉
化操作都是這樣的。
你從已有的RDD 中派生出新的RDD,Spark 會使用**譜系
圖(lineage graph)來記錄這些不同RDD 之間的依賴關系**。Spark 需要用這些信息來按需
計算每個RDD,也可以依靠譜系圖在持久化的RDD 丟失部分數據時恢復所丟失的數據。
行動操作
行動操作是第二種類型的RDD 操作,它們會把最終求得的結
果返回到驅動器程序,或者寫入外部存儲系統中。由於行動操作需要生成實際的輸出,它
們會強制執行那些求值必須用到的RDD 的轉化操作。
一些常用的行動操作函數:
- count()
:統計返回結果
- take()
:來收集RDD 中的一些元素
- collect()
: 用來獲取整個RDD 中的數據。如果你的程序把RDD 篩選到一個很小的規模,並且你想在本地處理這些數據時,就可以使用它。記住,只有當你的整個數據集能在單臺機器的內存中放得下時,才能使用collect(),因此,collect() 不能用在大規模數據集上。
- saveAsTextFile()、saveAsSequenceFile()
:規模較大的RDD 不能通過collect()收集到驅動器進程中時,可以使用saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行動操作來把RDD 的數據內容以各種自帶的格式保存起來,寫到諸如HDFS 或Amazon S3 這樣的分布式的存儲系統中。
註:每當我們調用一個新的行動操作時,整個RDD都會從頭開始計算。要避免這種低效的行為,用戶可以將中間結果持久化。
惰性求值
RDD 的轉化操作都是惰性求值的。這意味著在被調用行動操作之前Spark 不會
開始計算。惰性求值意味著當我們對RDD 調用轉化操作(例如調用map())時,操作不會立即執行。
相反,Spark 會在內部記錄下所要求執行的操作的相關信息。我們不應該把RDD 看作存
放著特定數據的數據集,而最好把每個**RDD 當作我們通過轉化操作構建出來的、記錄如
何計算數據的指令列表**。把數據讀取到RDD 的操作也同樣是惰性的。
Spark 使用惰性求值,這樣就可以把一些操作合並到一起來減少計算數據的步驟。
向spark傳遞參數
python
在Python 中,我們有三種方式來把函數傳遞給Spark。傳遞比較短的函數時,可以使用
lambda 表達式來傳遞。除了lambda 表達式,我們也可以傳遞頂
層函數或是定義的局部函數。
word = rdd.filter(lambda s: "error" in s)
def containsError(s):
return "error" in s
word = rdd.filter(containsError)
- 1
- 2
- 3
- 4
- 5
傳遞函數時需要小心的一點是,Python 會在你不經意間把函數所在的對象也序列化傳出
去。當你傳遞的對象是某個對象的成員,或者包含了對某個對象中一個字段的引用時(例
如self.field),Spark 就會把整個對象發到工作節點上,這可能比你想傳遞的東西大得多
(見例3-19)。有時,如果傳遞的類裏面包含Python 不知道如何序列化傳輸的對象,也會
導致你的程序失敗。
傳遞函數時需要小心的一點是,Python 會在你不經意間把函數所在的對象也序列化傳出
去。當你傳遞的對象是某個對象的成員,或者包含了對某個對象中一個字段的引用時,Spark 就會把整個對象發到工作節點上,這可能比你想傳遞的東西大得多。
如傳遞一個帶字段引用的函數(別這麽做!)
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def isMatch(self, s):
return self.query in s
def getMatchesFunctionReference(self, rdd):
# 問題:在"self.isMatch"中引用了整個self
return rdd.filter(self.isMatch)
def getMatchesMemberReference(self, rdd):
# 問題:在"self.query"中引用了整個self
return rdd.filter(lambda x: self.query in x)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
替代的方案是,只把你所需要的字段從對象中拿出來放到一個局部變量中,然後傳遞這個
局部變量。
傳遞不帶字段引用的Python 函數
class WordFunctions(object):
...
def getMatchesNoReference(self, rdd):
# 安全:只把需要的字段提取到局部變量中
query = self.query
return rdd.filter(lambda x: query in x)
- 1
- 2
- 3
- 4
- 5
- 6
scala
在Scala 中,我們可以把定義的內聯函數、方法的引用或靜態方法傳遞給Spark,就像
Scala 的其他函數式API 一樣。我們還要考慮其他一些細節,比如所傳遞的函數及其引用
的數據需要是可序列化的(實現了Java 的Serializable 接口)。除此以外,與Python 類似,
傳遞一個對象的方法或者字段時,會包含對整個對象的引用。這在Scala 中不是那麽明顯,
畢竟我們不會像Python 那樣必須用self 寫出那些引用。類似在例3-20 中對Python 執行
的操作,我們可以把需要的字段放到一個局部變量中,來避免傳遞包含該字段的整個對
象。
Scala 中的函數傳遞
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
RDD編程 | 29
// 問題:"isMatch"表示"this.isMatch",因此我們要傳遞整個"this"
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// 問題:"query"表示"this.query",因此我們要傳遞整個"this"
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// 安全:只把我們需要的字段拿出來放入局部變量中
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
如果在Scala 中出現了NotSerializableException,通常問題就在於我們傳遞了一個不可序列
化的類中的函數或字段。記住,傳遞局部可序列化變量或頂級對象中的函數始終是安全的。
常見的轉化操作和行動操作
基本RDD(受任意數據類型的RDD 支持的轉化操作和行動操作)
1. 針對各個元素的轉化操作
- map() : map() 的返回值類型不需要和輸。
入類型一樣 - filter() :根據規則篩選元素。
- flatMap() : 每個輸入元素生成多個輸出元素,返回值是一個返回值序列的叠代器。得到的是一個包含各個叠代器可訪問的所有元素的RDD。flatMap() 的一個簡
單用途是把輸入的字符串切分為單詞。
map 和flatmap的區別如下:
例子:map() 對RDD 中的所有數求平方
python中:
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
print "%i " % (num)
- 1
- 2
- 3
- 4
scala中:
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
- 1
- 2
- 3
2. 偽集合操作
盡管RDD 本身不是嚴格意義上的集合,但它也支持許多數學上的集合操作,比如合並和相交操作。如下圖所示:
註意,這些操作都要求操作的RDD是相同數據類型的。
另外distinct() 操作的開銷很大,因為它需要將所有數據通過網絡進行混洗(shuffle),以確保每個元素都只有一份。
3. 行動操作
- reduce : 接收一個函數作為參數,這個
函數要操作兩個RDD 的元素類型的數據並返回一個同樣類型的新元素。一個簡單的例子就
是函數+,可以用它來對我們的RDD 進行累加。 - fold : fold() 和reduce() 類似,接收一個與reduce() 接收的函數簽名相同的函數,再加上一個
“初始值”來作為每個分區第一次調用時的結果。 - aggregate() : 函數返回值類型不必與所操作的RDD類型相同。與fold() 類似,使用aggregate()時,需要提供我們期待返回的類型的初始值。如可用來計算RDD 的平均值。
例子:
python中:
#函數+
sum = rdd.reduce(lambda x, y: x + y)
#平均值
sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])
- 1
- 2
- 3
- 4
- 5
- 6
- 7
scala中:
//函數+
val sum = rdd.reduce((x, y) => x + y)
//平均值
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
val avg = result._1 / result._2.toDouble
- 1
- 2
- 3
- 4
- 5
- 6
- 7
RDD 的一些行動操作會以普通集合或者值的形式將RDD的部分或全部數據返回驅動器程序中。
- collect() :把數據返回驅動器程序中最簡單、最常見的操作是collect(),它會將整個RDD 的內容返回。collect() 通常在單元測試中使用。使用collect() 使得RDD 的值與預期結果之間的對比變得很容易。由於需要將數據
復制到驅動器進程中,collect()要求所有數據都必須能一同放入單臺機器的內存中。
- take(n) : 返回RDD中的n個元素,並且嘗試只訪問盡量少的分區,因此該操作會得到一個不均衡的集合。需要註意的是,這些操作返回元素的順序與你預期的可能不一樣。
- top() : 如果為數據定義了順序,就可以使用top()從RDD中獲取前幾個元素。top()會使用數據的默認順序,但我們也可以提供自己的比較函數,來提取前幾個元素。
- takeSample(withReplacement, num,seed) : 在驅動器程序中對我們的數據進行采樣。takeSample(withReplacement, num,seed) 函數可以讓我們從數據中獲取一個采樣,並指定是否替換。
- foreach() :有時我們會對RDD中的所有元素應用一個行動操作,但是不把任何結果返回到驅動器程序中,這也是有用的。比如可以用JSON 格式把數據發送到一個網絡服務器上,或者把數據存到數據庫中。不論哪種情況,都可以使用foreach()行動操作來對RDD中的每個元素進行操作,而不需要把RDD 發回本地。
更多操作:
4. 在不同RDD類型之間轉換
有些函數只能用於特定類型的RDD,比如mean()和variance()只能用在數值RDD 上,而join() 只能用在鍵值對RDD 上。
在Scala 中,將RDD轉為有特定函數的RDD(比如在RDD[Double]上進行數值操作)是由隱式轉換來自動處理的。上面提到過,我們需要加上import org.apache.spark.SparkContext._來使用這些隱式轉換。這些隱式轉換可以隱式地將一個RDD轉為各種封裝類,比如DoubleRDDFunctions(數值數據的RDD)和PairRDDFunctions(鍵值對RDD),這樣我們就有了諸如mean() 和variance() 之類的額外的函數。
Python 的API 結構與Java和Scala有所不同。在Python中,所有的函數都實現在基本的RDD 類中,但如果操作對應的RDD數據類型不正確,就會導致運行時錯誤。
持久化
為了避免多次計算同一個RDD,可以讓Spark對數據進行持久化。當我們讓Spark 持久化存儲一個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。如果一個有持久化數據的節點發生故障,Spark會在需要用到緩存的數據時重算丟失的數據分區。如果希望節點故障的情況不會拖累我們的執行速度,也可以把數據備份到多個節點上。
出於不同的目的,我們可以為RDD選擇不同的持久化級別。在Scala和Java 中,默認情況下persist() 會把數據以序列化的形式緩存在JVM 的堆空
間中。在Python 中,我們會始終序列化要持久化存儲的數據,所以持久化級別默認值就是以序列化後的對象存儲在JVM 堆空間中。
在Scala 中使用persist()
val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
- 1
- 2
- 3
- 4
如果要緩存的數據太多,內存中放不下,Spark 會自動利用最近最少使用(LRU)的緩存
策略把最老的分區從內存中移除。對於僅把數據存放在內存中的緩存級別,下一次要用到
已經被移除的分區時,這些分區就需要重新計算。但是對於使用內存與磁盤的緩存級別的
分區來說,被移除的分區都會寫入磁盤。不論哪一種情況,都不必擔心你的作業因為緩存
了太多數據而被打斷。不過,緩存不必要的數據會導致有用的數據被移出內存,帶來更多
重算的時間開銷。
RDD 還有一個方法叫作unpersist(),調用該方法可以手動把持久化的RDD 從緩
存中移除。
Spark快速大數據分析之RDD基礎