1. 程式人生 > >spark 學習隨筆(二)

spark 學習隨筆(二)

RDD 程式設計

彈性分散式資料集(Resilient Distributed Dataset,簡稱 RDD。RDD 其實就是分散式的元素集合。
在 Spark 中,對資料的所有操作不外乎建立 RDD轉化已有 RDD 以及呼叫 RDD 操作進行求值
而在這一切背後,Spark 會自動將RDD 中的資料分發到叢集上,並將操作並行化執行

RDD到底是什麼?

RDD是一個只讀的有屬性的資料集。屬性用來描述當前資料集的狀態,資料集是由資料的分割槽(partition)組成,並(由block)對映成真實資料。RDD屬性包括名稱、分割槽型別、父RDD指標、資料本地化、資料依賴關係等,主要屬性可以分為3類:

  1. 與其他RDD 的關係(parents)
    Spark 使用譜系圖(lineage graph)來記錄不同 RDD 之間的依賴關係(比如轉化操作)。
    在這裡插入圖片描述

  2. 資料(partitioner,checkpoint,storagelevel,iterator)

  3. RDD自身屬性(rddname,sparkcontext,sparkconf)

轉載一個例子https://www.jianshu.com/p/dd7c7243e7f9?from=singlemessage
scala:
在這裡插入圖片描述

簡單解釋下這幾行程式碼:
第一行,從HDFS上讀取in.txt檔案,建立了第一個RDD
第二行,按空格分詞,扁平化處理,生成第二個RDD,(轉化已有RDD)每個詞計數為1,生成了第三個RDD(轉化已有RDD)。這裡可能有人會問,為什麼生成了兩個RDD呢,因為此行程式碼RDD經過了兩次運算元轉換(transformation)操作。
第三行,按每個詞分組,累加求和,生成第四個RDD
第四行,將Wordcount統計結果輸出到HDFS

整個產生過程如下圖所示:
在這裡插入圖片描述
轉化操作和行動操作的區別在於 Spark 計算 RDD 的方式不同。雖然你可以在任何時候定義新的 RDD,但 Spark 只會惰性計算這些 RDD。它們只有第一次在一個行動操作中用到時,才會真正計算。一旦 Spark 瞭解了完整的轉化操作鏈之後,它就可以只計算求結果時真正需要的資料
預設情況下,Spark 的 RDD 會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個 RDD,可以使用 RDD.persist() 讓 Spark 把這個 RDD 快取下來(以分割槽方式儲存到叢集中的各機器上)。

總的來說,每個 Spark 程式或 shell 會話都按如下方式工作。


(1) 從外部資料創建出輸入 RDD。
(2) 使用諸如 filter() 這樣的轉化操作對 RDD 進行轉化,以定義新的 RDD。
(3) 告訴 Spark 對需要被重用的中間結果 RDD 執行 persist() 操作。
(4) 使用行動操作(例如 count() 和 first() 等)來觸發一次平行計算,Spark 會對計算進行優化後再執行。

建立 RDD

Spark 提供了兩種建立 RDD 的方式:
1、讀取外部資料集。(如之前的例子)
2、在驅動器程式中對一個集合進行並行化。
方式2用得並不多,畢竟這種方式需要把你的整個資料集先放在一臺機器的記憶體中。

#把程式中一個已有的集合傳給 SparkContext 的 parallelize()方法
lines = sc.parallelize(["pandas", "i don't like pandas"])

RDD 操作

RDD 支援兩種操作:轉化操作和行動操作
RDD 的轉化操作是返回一個新的 RDD 的操作,比如 map() 和 filter() ,

#用 Python 進行 union() 轉化操作
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

union() 與 filter() 的不同點在於它操作兩個 RDD 而不是一個。轉化操作可以操作任意
數量的輸入 RDD。

而行動操作則是向驅動器程式返回結果或把結果寫入外部系統的操作,會觸發實際的計算,比如 count() 和 first() 。Spark 對待轉化操作和行動操作的方式很不一樣

#在 Python 中使用行動操作對錯誤進行計數
print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
print line

這個例子在驅動器程式中使用 take() 獲取了 RDD 中的少量元素。然後在本地遍歷這些元素,並在驅動器端打印出來。

RDD 有一個 collect() 函式,可以用來獲取整個 RDD 中的資料。如果你的程式把 RDD 篩選到一個很小的規模,並且你想在本地處理這些資料時,就可以使用它。記住,只有當你的整個資料集能在單臺機器的記憶體中放得下時,才能使用 collect() ,因此, collect() 不能用在大規模資料集上。
在大多數情況下,RDD 不能通過 collect() 收集到驅動器程序中,因為它們一般都很大。通常要把資料寫到諸如 HDFS 或 Amazon S3 這樣的分散式的儲存系統中。你可以使用 saveAsTextFile() 、 saveAsSequenceFile() ,或者任意的其他行動操作來把 RDD 的資料內容以各種自帶的格式儲存起來。

如果對於一個特定的函式是屬於轉化操作還是行動操作感到困惑,你可以看看它的返回值型別:轉化操作返回的是 RDD,而行動操作返回的是其他的資料型別。

基本 RDD
轉化操作:map() 、flatMap()和 filter()
在這裡插入圖片描述若希望對每個輸入元素生成多個輸出元素。實現該功能的操作叫作 flatMap() 。
和 map() 類似,flatMap() 的函式被分別應用到了輸入 RDD 的每個元素上

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first() # 返回"hello",若用map(),返回則是“hello” “world”

在這裡插入圖片描述

  1. union(other) ,返回一個包含兩個 RDD 中所有元素的 RDD

  2. intersection(other) 方法,只返回兩個 RDD 中都有的元素( 單 個 RDD 內 的 重 復 元 素 也 會 一 起
    移 除 )。intersection() 與 union() 的概念相似, intersection() 的效能卻要差很多,因為它需要通過網路混洗資料來發現共有的元素。

  3. subtract(other) 函 數 接 收 另 一 個 RDD 作 為 參 數, 返 回一個由只存在於第一個 RDD
    中而不存在於第二個 RDD 中的所有元素組成的 RDD。和intersection() 一樣,它也需要資料混洗。

  4. cartesian(other) 轉化操作會返回所有可能的 (a, b) 對,其中 a 是源 RDD 中的元素,而 b 則來自另一個
    RDD。笛卡兒積在我們希望考慮所有可能的組合的相似度時比較有用,比如計算各使用者對各種產品的預期興趣程度。我們也可以求一個 RDD
    與其自身的笛卡兒積,這可以用於求使用者相似度的應用中。不過要特別注意的是,求大規模 RDD 的笛卡兒積開銷巨大。

在這裡插入圖片描述在這裡插入圖片描述在這裡插入圖片描述

行動操作

在這裡插入圖片描述fold() 和 reduce() 都要求函式的返回值型別需要和我們所操作的 RDD 中的元素型別相同。這很符合像 sum 這種操作的情況。但有時我們確實需要返回一個不同型別的值。例如,在計算平均值時,需要記錄遍歷過程中的計數以及元素的數量,這就需要我們返回一個二元組。可以先對資料使用 map() 操作,來把元素轉為該元素和 1 的二元組,也就是我們所希望的返回型別。這樣 reduce() 就可以以二元組的形式進行歸約了。

可以用 aggregate() 來計算 RDD 的平均值,來代替 map() 後面接 fold() 的方式。

sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])