1. 程式人生 > >Learning Spark——RDD常用操作

Learning Spark——RDD常用操作

RDD支援兩種操作:轉換(Transformation)操作和行動(Action)操作

為什麼會分為兩種操作,這兩種操作又有什麼區別呢?

我們先考慮一下平常我們使用的一些函式,舉個例子Long.toString(),這個轉換是把Long型別的轉換為String型別的。

如果同樣的事情在Spark中,是如何執行的呢?

Spark中轉換操作是“懶”執行的,就是說雖然我答應了把Long轉換成String,但是我只是把這個事情記在小本本里面,並不是現在去做,而是以後有了讓我做的動力才去做。

那麼什麼才是這個“動力”呢?那就是行動操作。一般來說,轉換操作是把某種RDD轉換成另一種RDD,而行動操作卻是實際地產生一個結果,比如把RDD寫入一個外部檔案系統,或者列印到驅動器程式輸出中。

這就好比我學資料探勘,我網上搜了很多攻略,告訴我可以先看看公開課,再讀讀書,然後參加Kaggle競賽,我信心滿滿地把這些都記錄下來,放在了某個和島國片一樣重要的收藏夾下面,然後。。。就沒有然後了。

But!如果此時,老大告訴我,我們的產品要新增基於位置性別以及年齡的使用者推薦,給你一個月時間去學習,否則捲鋪蓋走人。

面臨著這種生存還是死亡的問題,我當然會鼓足幹勁力爭上游,把之前記錄好的學習步奏按部就班地做一遍,最後把推薦系統做了出來。

這裡,公開課->讀書->Kaggle可以看做是轉換操作,做出推薦系統是行動操作,才是我們要的最終結果。只有有一個明確的行動操作,轉換操作才會被執行。

所以說人生如碼碼如人生,人生來就有惰性,想要堅持下去,任何時候都不要忘記我們到底想要什麼,我們的目標是什麼。

OK,言歸正傳,大家可以考慮一下Spark的轉換操作為什麼要用這種“懶”執行的機制,而不是正常地轉換一次執行一次呢?

Spark所有RDD都存在記憶體上,非常追求速度,所以要儘量減少RDD轉換的次數。使用“懶”執行可以把整個RDD轉換過程中可以合併的步奏合併起來執行,例如原來是a=>b,b=>c,Spark發現可以合併成a=>c,這樣就省了一步,加快了速度。對於我們開發者來說,就不需要考慮為了減少轉換次數如何從a直接跳到c,我們每次寫一個簡單的轉換操作a到b就可以了,這就是它的強大之處。

沒有對比就沒有傷害,在類似Hadoop MapReduce 的系統中,開發者常常花費大量時間考慮如何把操作組合到一起,以減少 MapReduce 的週期數。

下面我們結合程式碼講解一下幾種常見的轉換操作和行動操作

1. 基本轉換操作

map

將一個RDD中的每個資料項,通過map中的函式變為一個新的元素。
輸入分割槽與輸出分割槽一對一,即:有多少個輸入分割槽,就有多少個輸出分割槽。

// 檔案內容
hadoop fs -cat /test/test.log

haha 你好
hehe 你滾
heihei 你猜

// map操作
val data = sc.textFile("/test/test.log")
val mapresult = data.map(line => line.split(" "))
mapresult.collect

Array[Array[String]] = Array(Array(haha, 你好), Array(hehe, 你滾), Array(heihei, 你猜))

這裡寫圖片描述

flatMap

flatMap可以看做是兩步,第一步是map,第二步是flat。假設原始資料是 RDD[A],經過map以後變成 RDD[集合[B]],然後經過flat變成 RDD[B],所以如果我們的轉換操作會產生集合,但是我們要的結果不是這個集合,而是集合裡面的元素,那麼就可以使用flatMap

// flatMap操作
val flatmapresult = data.flatMap(line => line.split(" "))
flatmapresult.collect

Array[String] = Array(haha, 你好, hehe, 你滾, heihei, 你猜)

這裡寫圖片描述

filter

這個運算元相對簡單,對輸入的RDD中的每一個元素,通過一個函式判斷真假,過濾掉結果為假的元素

// filter操作
val intRDD = sc.parallelize(1 to 10)
val filterRDD = intRDD.filter(i => i > 3)
filterRDD.collect()

Array[Int] = Array(4, 5, 6, 7, 8, 9, 10)

distinct

對RDD中的元素進行去重操作

// distinct操作
val rdd = sc.parallelize(List("My","Book","My","Pen","Your","Her","His","Book","Pen"))
var distinctRDD = rdd.distinct()
distinctRDD.collect()

Array[String] = Array(My, Pen, Book, Her, His, Your)

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

該函式用於將RDD進行重分割槽,使用HashPartitioner。

第一個引數為重分割槽的數目,第二個為是否進行shuffle,預設為false

scala> var data = sc.textFile("/test/test.log")
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[53] at textFile at :21

scala> data.collect
res37: Array[String] = Array(haha 你好, hehe 你滾, heihei 你猜)

scala> data.partitions.size
res38: Int = 2  //RDD data預設有兩個分割槽

scala> var rdd1 = data.coalesce(1)
rdd1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[2] at coalesce at :23

scala> rdd1.partitions.size
res1: Int = 1   //rdd1的分割槽數為1


scala> var rdd1 = data.coalesce(4)
rdd1: org.apache.spark.rdd.RDD[String] = CoalescedRDD[3] at coalesce at :23

scala> rdd1.partitions.size
res2: Int = 2   //如果重分割槽的數目大於原來的分割槽數,那麼必須指定shuffle引數為true,//否則,分割槽數不便

scala> var rdd1 = data.coalesce(4,true)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at coalesce at :23

scala> rdd1.partitions.size
res3: Int = 4

repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

該函式其實就是coalesce函式第二個引數為true的實現,重新分割槽很有必要,比如如果分割槽數過於多,而每個分割槽對應的記憶體不夠用,那麼可能會發生OOM異常,這是就可以重新分割槽減少總分割槽數

scala> var rdd2 = data.repartition(1)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :23

scala> rdd2.partitions.size
res4: Int = 1

scala> var rdd2 = data.repartition(4)
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at repartition at :23

scala> rdd2.partitions.size
res5: Int = 4

randomSplit

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

該函式用來切分RDD成為多個RDD,有兩個引數,第一個是權重,是一個雙精度浮點數陣列,正常情況下該陣列的和應等於1;第二個是隨機數種子,隨意寫一個整數就OK。

該函式就是用來把一份資料分成幾份,比如我們做推薦系統,就會將資料按某個權重分成訓練資料和測試資料

// 一個包含10萬個數的RDD
scala> var rdd = sc.makeRDD(1 to 100000,10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:24

// 分成兩份,三七開
scala> var splitRDD = rdd.randomSplit(Array(0.3,0.7),10)
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[19] at randomSplit at <console>:26, MapPartitionsRDD[20] at randomSplit at <console>:26)

// 最後的結果差不多是三七開
scala> splitRDD(0).count()
res20: Long = 30072

scala> splitRDD(1).count()
res21: Long = 69928

union

懂SQL的應該對這個函式很熟悉,就是將結構相同的兩個RDD合併在一起,合併後的RDD可能包含重複元素

scala> var rdd1 = sc.makeRDD(1 to 2,1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[45] at makeRDD at :21

scala> rdd1.collect
res42: Array[Int] = Array(1, 2)

scala> var rdd2 = sc.makeRDD(2 to 3,1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[46] at makeRDD at :21

scala> rdd2.collect
res43: Array[Int] = Array(2, 3)

scala> rdd1.union(rdd2).collect
res44: Array[Int] = Array(1, 2, 2, 3)

於此類似的函式還有:intersection,返回兩個RDD的交集,去重;subtract,返回在RDD中出現,並且不在otherRDD中出現的元素,不去重

zipWithIndex

該函式將會產生一個鍵值對,鍵是RDD中的元素,值是從零開始的ID值

scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21

scala> rdd2.zipWithIndex().collect
res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

Refer