Spark入門學習——要點1
1、collect()函式
RDD還有一個collect()函式,可以用來獲取整個RDD中的資料。如果你的程式把RDD篩選到一個很小的規模,並且你想在本地處理這些資料時,就可以使用它。記住,只有當你的整個資料集能在單臺機器的記憶體中放得下時,才能使用collect(),因此,collect()不能用在大規模資料集上。使用collect()使得RDD的值與預期結果之間的對比變得很容易。由於需要將資料複製到驅動器程序中,collect()要求所有資料都必須能一同放入單臺機器的記憶體中。
2、Java標準函式介面
函式名 | 實現的方法 | 用途 |
Function<T,R> | Rcall(T) | 接收一個輸入值並返回一個輸出值,用於類似map()和filter()等操作中 |
Function2<T1,T2,R> | Rcall(T1,T2) | 接收兩個輸入值並返回一個輸出值,用於類似aggregate()和fold()等操作中 |
FlatMapFunction<T,R> | Iterable<R>call(T) | 接收一個輸入值並返回任意個輸出,用於類似flatMap()這樣的操作中 |
3、常見的轉化操作和action操作
對一個數據為{1,2,3,3}的RDD進行基本的RDD轉化操作
函式名 | 目的 | 示例 | 結果 |
map() | 將函式應用於Rdd中每個元素,將返回值構成新的RDD | rdd.map(x->x+1) | {2,3,4,4} |
flatMap() | 將函式應用於RDD中的每個元 素,將返回的迭代器的所有內 容構成新的RDD。通常用來切 分單詞 |
rdd.flatMap(x=>x.to(3)) | {1,2,3,2,3,3,3} |
filter() | 返回一個由通過傳給filter() 的函式的元素組成的RDD |
rdd.filter(x => x != 1) | {2,3,3} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(withReplacement, fraction,[seed]) |
對RDD取樣,以及是否替 | rdd.sample(false,0.5) | 非確定的 |
對資料分別為{1,2,3} 和{3,4,5}的RDD進行鍼對兩個RDD的轉化操作
函式名 | 目的 | 示例 | 結果 |
union() | 生成一個包含兩個RDD中所有元 素的RDD |
rdd.union(other) | {1,2,3,3,4,5} |
intersection() | 求兩個RDD共同的元素的RDD | rdd.intersection(other) | {3} |
subtract() | 移除一個RDD中的內容(例如移 除訓練資料) |
rdd.subtract(other) | {1,2} |
cartesian() | 與另一個RDD的笛卡兒積 | rdd.cartesian(other) | {(1,3),(1,4),... (3,5)} |
action操作
行動 | 涵義 |
---|---|
reduce(func) | 使用傳入的函式引數func對資料集中的元素進行匯聚操作 (兩兩合併). 該函式應該具有可交換與可結合的性質, 以便於能夠正確地進行平行計算. |
collect() | 在 driver program 上將資料集中的元素作為一個數組返回. 這在執行一個 filter 或是其他返回一個足夠小的子資料集操作後十分有用. |
count() | 返回資料集中的元素個數 |
first() | 返回資料集中的第一個元素 (與 take(1) 類似) |
take(n) | 返回資料集中的前n個元素 |
takeSample(withReplacement, num, [seed]) | 以陣列的形式返回資料集中隨機取樣的num個元素. |
takeOrdered(n, [ordering]) | 以其自然序或使用自定義的比較器返回 RDD 的前n元素 |
saveAsTextFile(path) | 將資料集中的元素寫入到指定目錄下的一個或多個文字檔案中, 該目錄可以存在於本地檔案系統, HDFS 或其他 Hadoop 支援的檔案系統. Spark 將會對每個元素呼叫 toString 將其轉換為檔案的一行文字. |
saveAsSequenceFile(path)(Java and Scala) | 對於本地檔案系統, HDFS 或其他任何 Hadoop 支援的檔案系統上的一個指定路徑, 將資料集中的元素寫為一個 Hadoop SequenceFile. 僅適用於實現了 Hadoop Writable 介面的 kay-value pair 的 RDD. 在 Scala 中, 同樣適用於能夠被隱式轉換成 Writable 的型別上 (Spark 包含了對於 Int, Double, String 等基本型別的轉換). |
saveAsObjectFile(path)(Java and Scala) | 使用 Java 序列化將資料集中的元素簡單寫為格式化資料, 可以通過 SparkContext.objectFile() 進行載入. |
countByKey() | 僅適用於 (K, V) 型別的 RDD. 返回每個 key 的 value 數的一個 hashmap (K, int) pair. |
foreach(func | 對資料集中的每個元素執行函式func. 這通常用於更新一個Accumulator或與外部儲存系統互動時的副作用. 注意: 修改 foreach() 外的非 Accumulator 變數可能導致未定義的行為. 更多細節請檢視Understanding closures. |
4、distinct()函式
我們的RDD中最常缺失的集合屬性是元素的唯一性,因為常常有重複的元素。如果只要唯一的元素,我們可以使用RDD.distinct()轉化操作來生成一個只包含不同元素的新RDD。不過需要注意,distinct()操作的開銷很大,因為它需要將所有資料通過網路進行混洗(shuffle),以確保每個元素都只有一份。
5、Spark RDD快取/持久化策略
為了避免多次計算同一個RDD,可以讓Spark對資料進行持久化。
出於不同的目的,我們可以為RDD選擇不同的持久化級別(如表3-6所示)。在Scala(見例3-40)和Java中,預設情況下persist()會把資料以序列化的形式快取在JVM的堆空間中。在Python中,我們會始終序列化要持久化儲存的資料,所以持久化級別預設值就是以序列化後的物件儲存在JVM堆空間中。當我們把資料寫到磁碟或者堆外儲存上時,也總是使用序列化後的資料。
表3-6:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化級別;如有必要,可以通過在儲存級別的末尾加上“_2”來把持久化資料存為兩份
如果在呼叫RDD的action的操作之前呼叫了persist()操作,並不會馬上出發強制求值機制,仍然是懶載入的機制。
如果快取的資料過多,記憶體中放不下,Spark會自動利用最近最少使用(LRU)快取策略進行資料分割槽移除。對於僅把資料存放在記憶體中的快取級別,下一次要用到已經被移除的分割槽,這些分割槽需要重新計算。但是對於記憶體+磁碟模式的快取級別的分割槽來說,被移除的分割槽都會被寫入磁碟。
RDD還有一個方法叫做UNpersist(),呼叫該方法可以手動把持久化的RDD從快取中刪除。
轉載於:https://my.oschina.net/guanhe/blog/2051525