samz5906的專欄
定義
RDD是彈性分散式資料集(Resilient Distributed Dataset), RDD 其實就是分散式的元素集合。就像List,Array,Set,Map集合。在 Spark 中,對資料的所有操作不外乎建立 RDD、 轉化已有 RDD 以及呼叫 RDD 操作進行求值。而在這一切背後, Spark 會自動將RDD中的資料分發到叢集上,並將操作並行化執行。
使用者可以使用兩種方法建立 RDD: 讀取一個外部資料集,或在驅動器程式裡分發驅動器程式中的物件集合(比如 list 和 set) ;
RDD是隻讀的。一旦生成就不能修改;
RDD可以通過重新計算得到;
舉個栗子,讀取檔案ReadME.md資料
》lines = sc.textFile("README.md")
》pythonLines = lines.filter(lambda line: "Python" in line)
Spark 只會惰性計算這些 RDD ,也就是它們只有第一次在一個行動操作中用到時,才會真正計算 。這樣的好處是上面例子我們以一個文字檔案定義了資料,然後把其中包含 Python 的行篩選出來。如果 Spark 在我們執行 lines= sc.textFile(…) 時就把檔案中所有的行都讀取並存儲起來, 就會消耗很多儲存空間,而我們馬上就要篩選掉其中的很多資料。相反, 一旦 Spark 瞭解了完整的轉化操作鏈之後,它就可以只計算求結果時真正需要的資料。事實上,在行動操作 first() 中, Spark 只需要掃描檔案直到找到第一個匹配的行為止,而不需要讀取整個檔案 。預設情況下, Spark 的 RDD 會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個 RDD, 可以使用 RDD.persist() 讓 Spark 把這個 RDD 快取下來
轉換(Transformation)操作
每次轉換操作都會生成一個新的RDD。轉化出來的 RDD 是惰性求值的,只有在行動操作中用到這些 RDD 時才會被計算 。
轉換 | 說明 |
---|---|
map | 資料集中的每條元素經過函式轉換後形成一個新的分散式資料集 |
filter | 過濾作用,選取資料集中讓函式返回為true的元素,形成一個新的資料集 |
flatMap | 類似於map,但每個輸入項可以被對映到0個或更多的輸出項 |
mapParttions | 類似於map,但單獨執行在RDD每個分割槽 |
union | 返回一個由原資料集和引數聯合而成的心得資料集 |
distinct | 返回一個數據集去重過後的新的資料集 |
說明
map:
第一次看map的操作,半天沒有看懂,原因是在java中map是一個k,v鍵值對集合。就搞不懂這個是怎麼 回事看了很久才明白這個跟java中的map不是一回事。map輸入是一個RDD,輸出也是一個RDD,RDD數不會變。
flatMap:
是一個RDD轉換函式,接受一個函式作為輸入,對當前RDD所有成員電泳輸入的函式,並返回一個新的RDD,flatMap對每個輸入的RDD,返回的是一個集合,集合裡面的成員會被展開,一個輸入可以對應多個輸出。比如對“# Apache Spark”進行空格拆分,返回的是一個數組,包含3個成員,“#”,“Apache”,“Spark”,最終3個成員會成為RDD直接成員。
行動(Action)操作
對資料轉換後進行行動操作,輸出結果不再是RDD,返回給Driver程式
操作 | 說明 |
---|---|
reduce | 對RDD成員使用進行reduce操作,返回結果只有一個值 |
collect | 將RDD讀取到Driver程式,型別是一個Array,一般要求RDD不能太大 |
count | 返回RDD成員數 |
first | 返回RDD一個成員 |
take(n) | 返回前n個成員 |
saveAsTextFile(path) | 將RDD轉換為文字內容並儲存到path路徑下,可能有多個檔案,path可以為具體路徑或HDFS地址 |
saveAsSequenceFile(path) | 與saveAsTextFile類似,但是已SequenceFile格式儲存 |
countBykey | 僅適用於(k,v)型別,對key計算,返回(k,int) |
foreeach(func) | 對RDD中的每個成員執行回撥func,沒有返回值,常用於更新計算器或輸出資料至外部儲存系統。這裡需要注意變數的作用域 |
說明:
collect:
可以用來獲取整個 RDD 中的資料。如果你的程式把 RDD 篩選到一個很小的規模,並且你想在本地處理 這些資料時, 就可以使用它。只有當你的整個資料集能在單臺機器的記憶體中放得下時,才能使用 collect(),因此, collect() 不能用在大規模資料集上 。