1. 程式人生 > >Spark-RDD-02基本操作詳解

Spark-RDD-02基本操作詳解

Spark中RDD是一個不可變的分散式物件集合,每個RDD都被分為多個分割槽,這些分割槽被分發到叢集的不同的節點中進行計算。

SparkContext是Spark的程式設計主入口點,可以被用於在叢集中建立RDDs,在spark-shell中被系統預設建立為sc。

兩種建立RDD的方式:(1)呼叫SparkContext的parallelize()方法將資料並行化成RDD

                                     (2)從外部儲存系統(eg.HDFS、共享檔案系統、Hbase等)引用資料成RDD

首先還是開啟叢集,進入到spark-shell

通過呼叫SparkContext的parallelize方法將驅動程式已經存在的資料集轉化為並行化集合Parallelized Collections).。

集合的元素被複制以形成可並行操作的分散式資料集。

例如,下面建立一個包含數字 1~5的並行集合:

一旦建立,分散式資料集(distData) 就可以進行並行操作。例如,我們可以呼叫distData.reducel(a, b)=> a + b)將陣列的

元素相加。

並行集合的一個重要引數是將 資料集切割到的分割槽數(artitions) ,Spark 將為叢集的每個RDD分割槽執行一個計算任務(task),

即RDD每個分割槽是計算任務的基本分配單位,而非整個RDD。通常,Spark 會根據叢集實際情況自動設定分割槽數。

但是,也可以通過將其作為第二個數傳遞給paralice來手動設定,如下例項欲將data資料集切分為10個分割槽。

而實際開發中更多的是從外部資料來源讀取來建立RDD。

讀取:讀取後的RDD都是由String物件組成的RDD[String],然後就可以通過變數people呼叫RDD類中的定義的方法。

有關Spark的textFile0讀取檔案的一一些注意事項如下:

①如果需從本地檔案系統讀取檔案作為外部資料來源,則檔案必須確保叢集上的所有工作節點可訪問。可以將檔案複製到

    所有工作節點或使用叢集上的共享檔案系統。

②Spark所有的基於檔案的讀取方法,包括textFile 支援讀取某個目錄下多個指定檔案,支援部分的壓縮檔案和萬用字元。

     例如,可以使用textFile("/my/directory/*")讀取該目錄下所檔案,可以採用萬用字元匹配同一型別檔案

     textFile(/my/directory/* .txt"),也可以讀取壓縮文textFilcC/myldirectory/* .gz"),還可以使用

     textFil("/myl/irectoreldatat.lxt'","/my/directory/data2.txt)時讀取來自不同路徑的多個檔案。

③該textFile方法還採用可選的第二個引數來控制檔案的分割槽數。預設情況下,Spark 為檔案的每個塊建立一個分割槽

     (HDFS中預設為128MB),但也可以通過傳遞更大的值來請求更高數量的分割槽。請注意,不能有比塊少的分割槽。

同時對於許多小檔案的目錄,可以採用SparkContext.wholeTextFiles

RDD操作:

RDD支援兩種型別的操作:轉化操作(transformation) 與行動操作(action)。 collect()不能應用在大規模資料集上。

轉化操作從一個已存在的RDD建立一個新的RDD:

行動操作在RDD上進行計算後將結果值返回給驅動程式的操作。 例如,map通過遍歷RDD的每一個元素,進行相應的使用者定義的操作,並返回表示結果的新RDD的轉換操(transformation)。 

另一方面,reduce 是使用一些函式聚合RDD的所有元素,並將最終結果返回給驅動程式的行動操作(action)。 

在此,分辨一個操作到底是轉化操作還是行動操作,可以根據返回值型別來直觀判斷,即轉化操作返回值皆為RDD,行動操作

則是表示計算結果的Int、String、Array、List型別返回值(當然也存在例外,例如reduceByKey,其雖為行動操作,但返回的仍

為RDD。 Spark中的所有轉換操作都是懶惰計算的,因為它們不會馬上計算結果。相反,它們只記住應用於某些基本資料集(RDD)的

轉換關係(RDD轉化譜系圖)。 只有當某個行動操作需要將結果返回給驅動程式時才會真正進行轉換計算。這種設計使Spark

能夠更高效地執行。例如,我們可以認識到,通過對建立的RDD依次呼叫map、reduce 操作,返回到驅動程式的僅是經過

map、reduce 最終處理後的結果(很小的結果集),而不是經map操作後的很大的對映資料集,這也反映出了惰性求值在大數

據分析領域的合理性。 預設情況下,被重用的中間結果RDD可能會在每次對其進行行動操作時重新計算。但是,可以使用persist (cache)方法在記憶體

中保留被重用的中間結果RDD,在這種情況下,Spark將在叢集記憶體上保留該RDD,以便在下次查詢時進行更快的訪問。還支援

在磁碟上持久儲存RDD。

============================================================================================

轉化操作例項:

經常用到的兩個轉化操作是map()和filter(), 這兩者的共同點在於會觸發對RDD中所有元素進行遍歷。轉化操作map()接收

一個函式,把這個函式用於RDD中的每個元素,將函式的返回結果作為結果RDD中對應元素的值。而轉化操作filter()則

接收一個函式, 並將RDD中滿足該函式的元素放入新的RDD中返回。

map例項:

filter例項:

詳解上面的佔位符:

(1) import匯入包的所有成員,相當於Java的*。inporn scala.math._。比Java方便的一點是它可以匯入某個類下的所有靜態

      成員,Java 則需要impar static。

(2)佔位符,來表示某一個引數, 這個用法比較多。比如對collection,sequence或者本章所學的RDD呼叫方法

    map、filter、foreach等對每一個元素進行處理,可以使用_表示每一個元素, 例如map(_.func);還有引數推導時f(250*_)

(3)對變數進行預設初始化,下劃線代表的是某一型別的預設值, 對於Int來說,它是0。對於Double來說,它是0.0.對於

     引用型別,它是null.比如var i:Int_.

(4)訪問tuple (元組)的某個元素時通過索引n來取得第n個元素,可以用方法_1,_2,_3訪問組員, 如a._2。

(5)向函式或方法傳入可變引數時,不能直接傳入Range 或集合或陣列物件,需要使用: _*轉換才可傳入。

(6)類的cter方法,比如類A中定義了var f.則相當於定義了seter 方法f_=.當然你可以自己定義f_=方法來完成更多的事情,比如

    設定前做一些判斷或預處理之類的操作。

flatMap例項:

     flatMap和map操作皆是傳入func對RDD每個元素進行處理的操作,不同點在於 map(func)傳入的func在處理RDD的

     每一個元素後都產生相對應的結果,而正是由這些一 一對應的結果值組成了輸出RDD,而flatMap(func)的傳入func在

     處理每一個元素時,都可能會產生一個或多個對應的元素組成的返回值序列的迭代器,輸出的RDD倒不是由迭代器組

     成的,而是一個包含各個迭代器可訪問所有元素的RDD。因此,當我們希望對每個輸入元素生成多個輸出元素,可以

     使用flatMap。

   

集合操作:distinct. union. intersection, subtract. cartesian

儘管RDD本身不是嚴格意義上的集合,但它也支援許多數學上的集合操作,比如合併(union)、相交(intersection)、

作差(subtract)、 去重(distinct). 笛卡兒積cartesian) 操作,值得注意的是,這些操作都要求操作涉及的RDD[T]是相同

資料型別的。

動作操作:

注意不論哪種情況,都可以使用foreach()動作操作來對RDD中的每個元素進行操作,而無需把RDD發回本地。