1. 程式人生 > 實用技巧 >Spark入門學習——要點1

Spark入門學習——要點1

>>> hot3.png

1、collect()函式

RDD還有一個collect()函式,可以用來獲取整個RDD中的資料。如果你的程式把RDD篩選到一個很小的規模,並且你想在本地處理這些資料時,就可以使用它。記住,只有當你的整個資料集能在單臺機器的記憶體中放得下時,才能使用collect(),因此,collect()不能用在大規模資料集上。使用collect()使得RDD的值與預期結果之間的對比變得很容易。由於需要將資料複製到驅動器程序中,collect()要求所有資料都必須能一同放入單臺機器的記憶體中。

2、Java標準函式介面

函式名 實現的方法 用途
Function<T,R> Rcall(T)

接收一個輸入值並返回一個輸出值,用於類似map()和filter()等操作中

Function2<T1,T2,R> Rcall(T1,T2)

接收兩個輸入值並返回一個輸出值,用於類似aggregate()和fold()等操作中

FlatMapFunction<T,R> Iterable<R>call(T)

接收一個輸入值並返回任意個輸出,用於類似flatMap()這樣的操作中

3、常見的轉化操作和action操作

對一個數據為{1,2,3,3}的RDD進行基本的RDD轉化操作

函式名 目的 示例 結果
map() 將函式應用於Rdd中每個元素,將返回值構成新的RDD rdd.map(x->x+1) {2,3,4,4}
flatMap()

將函式應用於RDD中的每個元

素,將返回的迭代器的所有內

容構成新的RDD。通常用來切

分單詞
rdd.flatMap(x=>x.to(3)) {1,2,3,2,3,3,3}
filter()

返回一個由通過傳給filter()

的函式的元素組成的RDD
rdd.filter(x => x != 1) {2,3,3}
distinct() 去重 rdd.distinct() {1,2,3}

sample(withReplacement,

fraction,[seed])

對RDD取樣,以及是否替 rdd.sample(false,0.5) 非確定的

對資料分別為{1,2,3} 和{3,4,5}的RDD進行鍼對兩個RDD的轉化操作

函式名 目的 示例 結果
union()

生成一個包含兩個RDD中所有元

素的RDD
rdd.union(other) {1,2,3,3,4,5}
intersection() 求兩個RDD共同的元素的RDD rdd.intersection(other) {3}
subtract()

移除一個RDD中的內容(例如移

除訓練資料)
rdd.subtract(other) {1,2}
cartesian() 與另一個RDD的笛卡兒積 rdd.cartesian(other)

{(1,3),(1,4),...

(3,5)}

action操作

行動 涵義
reduce(func) 使用傳入的函式引數func對資料集中的元素進行匯聚操作 (兩兩合併). 該函式應該具有可交換與可結合的性質, 以便於能夠正確地進行平行計算.
collect() 在 driver program 上將資料集中的元素作為一個數組返回. 這在執行一個 filter 或是其他返回一個足夠小的子資料集操作後十分有用.
count() 返回資料集中的元素個數
first() 返回資料集中的第一個元素 (與 take(1) 類似)
take(n) 返回資料集中的前n個元素
takeSample(withReplacement, num, [seed]) 以陣列的形式返回資料集中隨機取樣的num個元素.
takeOrdered(n, [ordering]) 以其自然序或使用自定義的比較器返回 RDD 的前n元素
saveAsTextFile(path) 將資料集中的元素寫入到指定目錄下的一個或多個文字檔案中, 該目錄可以存在於本地檔案系統, HDFS 或其他 Hadoop 支援的檔案系統. Spark 將會對每個元素呼叫 toString 將其轉換為檔案的一行文字.
saveAsSequenceFile(path)(Java and Scala) 對於本地檔案系統, HDFS 或其他任何 Hadoop 支援的檔案系統上的一個指定路徑, 將資料集中的元素寫為一個 Hadoop SequenceFile. 僅適用於實現了 Hadoop Writable 介面的 kay-value pair 的 RDD. 在 Scala 中, 同樣適用於能夠被隱式轉換成 Writable 的型別上 (Spark 包含了對於 Int, Double, String 等基本型別的轉換).
saveAsObjectFile(path)(Java and Scala) 使用 Java 序列化將資料集中的元素簡單寫為格式化資料, 可以通過 SparkContext.objectFile() 進行載入.
countByKey() 僅適用於 (K, V) 型別的 RDD. 返回每個 key 的 value 數的一個 hashmap (K, int) pair.
foreach(func 對資料集中的每個元素執行函式func. 這通常用於更新一個Accumulator或與外部儲存系統互動時的副作用. 注意: 修改 foreach() 外的非 Accumulator 變數可能導致未定義的行為. 更多細節請檢視Understanding closures.

4、distinct()函式

我們的RDD中最常缺失的集合屬性是元素的唯一性,因為常常有重複的元素。如果只要唯一的元素,我們可以使用RDD.distinct()轉化操作來生成一個只包含不同元素的新RDD。不過需要注意,distinct()操作的開銷很大,因為它需要將所有資料通過網路進行混洗(shuffle),以確保每個元素都只有一份。

5、Spark RDD快取/持久化策略

為了避免多次計算同一個RDD,可以讓Spark對資料進行持久化。

出於不同的目的,我們可以為RDD選擇不同的持久化級別(如表3-6所示)。在Scala(見例3-40)和Java中,預設情況下persist()會把資料以序列化的形式快取在JVM的堆空間中。在Python中,我們會始終序列化要持久化儲存的資料,所以持久化級別預設值就是以序列化後的物件儲存在JVM堆空間中。當我們把資料寫到磁碟或者堆外儲存上時,也總是使用序列化後的資料。

表3-6:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化級別;如有必要,可以通過在儲存級別的末尾加上“_2”來把持久化資料存為兩份

如果在呼叫RDD的action的操作之前呼叫了persist()操作,並不會馬上出發強制求值機制,仍然是懶載入的機制。

如果快取的資料過多,記憶體中放不下,Spark會自動利用最近最少使用(LRU)快取策略進行資料分割槽移除。對於僅把資料存放在記憶體中的快取級別,下一次要用到已經被移除的分割槽,這些分割槽需要重新計算。但是對於記憶體+磁碟模式的快取級別的分割槽來說,被移除的分割槽都會被寫入磁碟。

RDD還有一個方法叫做UNpersist(),呼叫該方法可以手動把持久化的RDD從快取中刪除。

轉載於:https://my.oschina.net/guanhe/blog/2051525