Spark -- RDD簡單操作【統計文字中單行最大單詞數】
一 、什麼是RDD ?
RDD在Spark【Scala語言】中,是一種資料結構【基於記憶體,可持久化】,就好比Java的ArrayList一樣,可以進行各種的Action操作,比如Java中的List集合,可以進行get【獲取元素】、add【增加元素】、remove【移除元素】等操作;
當然,Scala語言底層實現是基於JVM的,即Scala相容Java語言【但高效於Java】,因此,Java的List集合可以直接拿來在Scala中使用;
對於Spark的RDD,一樣有其對應的Action【行動】操作,主要操作有以下幾個
collect:返回RDD所有元素【注:如果spark為叢集模式,則從各個work節點上抓取RDD資料】 count : 統計RDD資料集中的元素個數 reduce:並行整合RDD資料集中的元素,如(a,b)=> a+b【累加求和】, (a,b) = > if(a>b) a else b【漸進比大小,求最大元素】 foreach(func) :遍歷RDD資料集中的元素,並進行func【函式】操作。如:println【列印函式】 ....etc
RDD不單單是一個數據集【資料結構】,它的全稱可是:彈性分散式資料集【Resilient Distributed Datasets】
如此高逼格的名稱可不是隨便叫的,為什麼呢?
(1)為什麼稱是彈性的 【個人理解】
彈簧我們知道,可長可短,即可伸縮;
在整個Spark計算的過程中,都是圍繞著RDD資料集來的,即,計算一開始會產生N個RDD資料集,隨著計算的推進,N個RDD會被轉來轉去,但是最終會得到一個RDD資料集,也就是我們想要的計算結果。
而且整個過程是流水線【並行】的,每個流水線上【work節點】執行過程得到的RDD不用等其他流水線上的RDD,區別於MapReduce【reduce任務需要等所有的map任務完成後,才能進行】,而執行過程中得到的RDD都是基於記憶體的,因此Spark的執行效率要遠高於Hadoop的MapReduce,因為MapReduce的每一個map和reduce任務都要讀寫磁碟【IO開銷很大】
(2)分散式
這個不做過多解釋,只要涉及大資料,必提分散式
字眼如:多臺機器、並行、分割槽、任務排程...etc【叢集】
(3)資料集
不要見到RDD,就如同看到了三個陌生的字母一樣,它是一種資料結構,準確說是一種基於記憶體的資料結構,在準確點說,就是scala語言對資料集在記憶體中的一種封裝【包裝】,至於為什麼這樣做,我說不下去了....,
二、怎麼得到一個RDD呢?
注意這裡我用的是得到,而不是建立【建立讓人有種即將寫程式碼的緊迫感,有沒有】
在我沒有開始寫demo來突出本篇博文的主題時,我們還是來想想,既然上面說了什麼是RDD,那麼,這個抽象的東西究竟怎麼獲得呢?
上面我們說過,RDD是有行動【Action】操作的,也就是RDD常用到的幾個函式【count、reduce...etc】,所謂的行動操作是基於RDD資料集的,注意,得先有RDD,才能進行下一步函式的呼叫
在Spark中,我們把得到【建立】RDD的過程叫做RDD的轉換【Transformation】過程,它是一種延遲操作,為什麼這樣說呢?
[ 我去,越扯越多,本來三五行demo就能搞定的博文,我居然....... ]
這就要提到Spark的惰性機制了,RDD在轉換的過程中,看似得到了一個RDD,其實這個RDD是個虛的,並沒有立馬在記憶體中建立,只有我們在執行行動操作的時候,這個RDD才從頭開始執行並在記憶體中建立,話又說回來,RDD的轉換過程有哪些常用的函式呢【也就是RDD的建立函式】
[ 太抽象? 別急,欠的demo示例,一會一起補上 ]
textFile:從本地或者HDFS檔案系統中的指定檔案中獲取RDD資料集
map: 對資料集中的元素按照某種規則進行轉換,得到新的RDD
filter:對資料集中的元素進行某種規則的過濾轉換,得到新的RDD
.... etc
三、RDD簡單操作
該說的也說的差不多了,下面就把上面欠的demo補上
功能:統計word.txt檔案中,單行單詞數最大的,並輸出結果
步驟:
input: 本地word.txt
轉換操作:RDD1 ---> RDD2 --> RDD3
執行操作:RDD3.reduce
output:輸出結果
(1)建立input 【隨便找個目錄,word.txt 如下】
a b d d e f
a a d c c e h j k
o i k l m n b v
q w e r t y u i are v x a
q w e
(2)將檔案轉換成RDD 【使用轉換函式 -- textFile】 --- RDD1
【注:本地模式非叢集模式,且demo演示主要在spark-shell中進行演示】
var lines = sc.textFile("word01.txt")
注意,上面是一個完整的計算過程,如果單有RDD的轉換過程是無法真正在記憶體中建立一個RDD的,如下面這種:
看似正常,感覺我們好像建立了一個RDD,但是,當我們真正使用它的時候,卻發現報異常了
這就是Spark的惰性【延遲】機制,如果RDD在一開始轉換的時候就在記憶體中建立資料集的話,那麼一開始就會報檔案不存在的異常,而不是在我們呼叫RDD的執行函式時才發現
正常點,我們列印下正確的RDD的資料集的內容:
lines.foreach(println)
(3)將上一步得到的lines進一步轉換,生成新的RDD --- RDD2
利用map函式配合lambda表示式,重新轉換lines中的元素,將每一行文字按照空格拆分成一個stirng單詞陣列
var lineArray = lines.map(line => line.split(" "))
(4)將上一步得到的lineArray再次轉換,得到新的RDD --- RDD3
利用map函式配合lambda表示式,再次將上一步得到的lineArray中的元素進行轉換,得到新的RDD資料集lineWordSize
(5) 對上一步得到的RDD進行Action操作,得到最終結果 -- RDD3 --> output
利用reduce函式,對RDD3資料集進行整合,找出最大的單詞數
(5) 核對計算結果
注:第二行文字,多了一個空格,因此統計出來的是10個單詞【不要有疑問哈】
四、RDD簡單操作 -- 一氣呵成
【三】步驟有點太繁瑣了,scala本身就是一個高效、簡潔的語言(同Python),因此,我們用最簡潔的方式在跑一遍demo,拿到我們最終要的計算結果:12
【看好了,不要眨眼...... 激動人心的時刻到了,哈哈】
var result = sc.textFile("word01.txt").map(line => line.split(" ").size).reduce((a,b) => if(a>b) a else b)
一行程式碼搞定!!!! ---- 流水線操作