Spark RDD算子實戰
Spark算子概述
RDD:彈性分布式數據集,是一種特殊集合、支持多種來源、有容錯機制、可以被緩存、支持並行操作,一個RDD代表多個分區裏的數據集。
RDD有兩種操作算子:
- Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另一個RDD時並沒有立即進行轉換,僅僅是記住了數據集的邏輯操作
- Action(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
需要說明的是,下面寫的scala代碼,其實都是可以簡寫的,但是為了方便理解,我都沒有簡寫,因為要簡寫的話對於scala來說真的就是一句話的事情了。
另外如果是在本地環境進行開發,那麽需要添加相關依賴:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.2</version> </dependency>
Transformation算子
概述
需要操作的Transformation算子說明如下:
-
map(func)
返回一個新的分布式數據集,由每個原元素經過func函數轉換後組成
-
filter(func)
返回一個新的數據集,由經過func函數後返回值為true的原元素組成
-
flatMap(func)
類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)
-
sample(withReplacement, frac, seed)
根據給定的隨機種子seed,隨機抽樣出數量為frac的數據
-
union(otherDataset)
返回一個新的數據集,由原數據集和參數聯合而成
-
groupByKey([numTasks])
在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。註意:默認情況下,使用8個並行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task
-
reduceByKey(func, [numTasks])
在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。
-
join(otherDataset, [numTasks])
在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集
map
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps1(sc)
sc.stop()
}
/**
* 1、map:將集合中每個元素乘以7
* map(func):返回一個新的分布式數據集,由每個原元素經過func函數轉換後組成
*/
def transformationOps1(sc:SparkContext): Unit = {
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD = sc.parallelize(list)
val retRDD = listRDD.map(num => num * 7)
retRDD.foreach(num => println(num))
}
}
執行結果如下:
42
7
49
14
56
21
63
28
70
35
filter
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps2(sc)
sc.stop()
}
/**
* 2、filter:過濾出集合中的奇數
* filter(func): 返回一個新的數據集,由經過func函數後返回值為true的原元素組成
*
* 一般在filter操作之後都要做重新分區(因為可能數據量減少了很多)
*/
def transformationOps2(sc:SparkContext): Unit = {
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD = sc.parallelize(list)
val retRDD = listRDD.filter(num => num % 2 == 0)
retRDD.foreach(println)
}
}
輸出結果如下:
6
2
8
4
10
flatMap
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps3(sc)
sc.stop()
}
/**
* 3、flatMap:將行拆分為單詞
* flatMap(func):類似於map,但是每一個輸入元素,
* 會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)
*/
def transformationOps3(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
wordsRDD.foreach(println)
}
}
輸出結果如下:
hello
hello
he
you
hello
me
sample
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps4(sc)
sc.stop()
}
/**
* 4、sample:根據給定的隨機種子seed,隨機抽樣出數量為frac的數據
* sample(withReplacement, frac, seed): 根據給定的隨機種子seed,隨機抽樣出數量為frac的數據
* 抽樣的目的:就是以樣本評估整體
* withReplacement:
* true:有放回的抽樣
* false:無放回的抽樣
* frac:就是樣本空間的大小,以百分比小數的形式出現,比如20%,就是0.2
*
* 使用sample算子計算出來的結果可能不是很準確,1000個數,20%,樣本數量在200個左右,不一定為200
*
* 一般情況下,使用sample算子在做spark優化(數據傾斜)的方面應用最廣泛
*/
def transformationOps4(sc:SparkContext): Unit = {
val list = 1 to 1000
val listRDD = sc.parallelize(list)
val sampleRDD = listRDD.sample(false, 0.2)
sampleRDD.foreach(num => print(num + " "))
println
println("sampleRDD count: " + sampleRDD.count())
println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count())
}
}
輸出結果如下:
sampleRDD count: 219
Another sampleRDD count: 203
union
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps5(sc)
sc.stop()
}
/**
* 5、union:返回一個新的數據集,由原數據集和參數聯合而成
* union(otherDataset): 返回一個新的數據集,由原數據集和參數聯合而成
* 類似數學中的並集,就是sql中的union操作,將兩個集合的所有元素整合在一塊,包括重復元素
*/
def transformationOps5(sc:SparkContext): Unit = {
val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val list2 = List(7, 8, 9, 10, 11, 12)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val unionRDD = listRDD1.union(listRDD2)
unionRDD.foreach(println)
}
}
輸出結果如下:
1
6
2
7
3
8
4
9
5
10
7
8
9
10
11
12
groupByKey
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps6(sc)
sc.stop()
}
/**
* 6、groupByKey:對數組進行 group by key操作
* groupByKey([numTasks]): 在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。
* 註意:默認情況下,使用8個並行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task
* mr中:
* <k1, v1>--->map操作---><k2, v2>--->shuffle---><k2, [v21, v22, v23...]>---><k3, v3>
* groupByKey類似於shuffle操作
*
* 和reduceByKey有點類似,但是有區別,reduceByKey有本地的規約,而groupByKey沒有本地規約,所以一般情況下,
* 盡量慎用groupByKey,如果一定要用的話,可以自定義一個groupByKey,在自定義的gbk中添加本地預聚合操作
*/
def transformationOps6(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
pairsRDD.foreach(println)
val gbkRDD:RDD[(String, Iterable[Int])] = pairsRDD.groupByKey()
println("=============================================")
gbkRDD.foreach(t => println(t._1 + "..." + t._2))
}
}
輸出結果如下:
(hello,1)
(hello,1)
(you,1)
(he,1)
(hello,1)
(me,1)
=============================================
you...CompactBuffer(1)
hello...CompactBuffer(1, 1, 1)
he...CompactBuffer(1)
me...CompactBuffer(1)
reduceByKey
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps7(sc)
sc.stop()
}
/**
* 7、reduceByKey:統計每個班級的人數
* reduceByKey(func, [numTasks]): 在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,
* key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。
*
* 需要註意的是還有一個reduce的操作,其為action算子,並且其返回的結果只有一個,而不是一個數據集
* 而reduceByKey是一個transformation算子,其返回的結果是一個數據集
*/
def transformationOps7(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD:RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.foreach(t => println(t._1 + "..." + t._2))
}
}
輸出結果如下:
you...1
hello...3
he...1
me...1
join
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps8(sc)
sc.stop()
}
/**
* 8、join:打印關聯的組合信息
* join(otherDataset, [numTasks]): 在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集
* 學生基礎信息表和學生考試成績表
* stu_info(sid ,name, birthday, class)
* stu_score(sid, chinese, english, math)
*
* * Serialization stack:
- object not serializable
這種分布式計算的過程,一個非常重要的點,傳遞的數據必須要序列化
通過代碼測試,該join是等值連接(inner join)
A.leftOuterJoin(B)
A表所有的數據都包涵,B表中在A表沒有關聯的數據,顯示為null
之後執行一次filter就是join的結果
*/
def transformationOps8(sc: SparkContext): Unit = {
val infoList = List(
"1,鐘 瀟,1988-02-04,bigdata",
"2,劉向前,1989-03-24,linux",
"3,包維寧,1984-06-16,oracle")
val scoreList = List(
"1,50,21,61",
"2,60,60,61",
"3,62,90,81",
"4,72,80,81"
)
val infoRDD:RDD[String] = sc.parallelize(infoList)
val scoreRDD:RDD[String] = sc.parallelize(scoreList)
val infoPairRDD:RDD[(String, Student)] = infoRDD.map(line => {
val fields = line.split(",")
val student = new Student(fields(0), fields(1), fields(2), fields(3))
(fields(0), student)
})
val scorePairRDD:RDD[(String, Score)] = scoreRDD.map(line => {
val fields = line.split(",")
val score = new Score(fields(0), fields(1).toFloat, fields(2).toFloat, fields(3).toFloat)
(fields(0), score)
})
val joinedRDD:RDD[(String, (Student, Score))] = infoPairRDD.join(scorePairRDD)
joinedRDD.foreach(t => {
val sid = t._1
val student = t._2._1
val score = t._2._2
println(sid + "\t" + student + "\t" + score)
})
println("=========================================")
val leftOuterRDD:RDD[(String, (Score, Option[Student]))] = scorePairRDD.leftOuterJoin(infoPairRDD)
leftOuterRDD.foreach(println)
}
}
輸出結果如下:
3 3 包維寧 1984-06-16 oracle 3 62.0 90.0 81.0
2 2 劉向前 1989-03-24 linux 2 60.0 60.0 61.0
1 1 鐘 瀟 1988-02-04 bigdata 1 50.0 21.0 61.0
=========================================
(4,(4 72.0 80.0 81.0,None))
(3,(3 62.0 90.0 81.0,Some(3 包維寧 1984-06-16 oracle)))
(2,(2 60.0 60.0 61.0,Some(2 劉向前 1989-03-24 linux)))
(1,(1 50.0 21.0 61.0,Some(1 鐘 瀟 1988-02-04 bigdata)))
sortByKey
測試代碼如下:
object _02SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_02SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps7(sc)
sc.stop()
}
/**
* sortByKey:將學生身高進行(降序)排序
* 身高相等,按照年齡排(升序)
*/
def transformationOps9(sc: SparkContext): Unit = {
val list = List(
"1,李 磊,22,175",
"2,劉銀鵬,23,175",
"3,齊彥鵬,22,180",
"4,楊 柳,22,168",
"5,敦 鵬,20,175"
)
val listRDD:RDD[String] = sc.parallelize(list)
/* // 使用sortBy操作完成排序
val retRDD:RDD[String] = listRDD.sortBy(line => line, numPartitions = 1)(new Ordering[String] {
override def compare(x: String, y: String): Int = {
val xFields = x.split(",")
val yFields = y.split(",")
val xHgiht = xFields(3).toFloat
val yHgiht = yFields(3).toFloat
val xAge = xFields(2).toFloat
val yAge = yFields(2).toFloat
var ret = yHgiht.compareTo(xHgiht)
if (ret == 0) {
ret = xAge.compareTo(yAge)
}
ret
}
} ,ClassTag.Object.asInstanceOf[ClassTag[String]])
*/
// 使用sortByKey完成操作,只做身高降序排序
val heightRDD:RDD[(String, String)] = listRDD.map(line => {
val fields = line.split(",")
(fields(3), line)
})
val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1) // 需要設置1個分區,否則只是各分區內有序
retRDD.foreach(println)
// 使用sortByKey如何實現sortBy的二次排序?將上面的信息寫成一個java對象,然後重寫compareTo方法,在做map時,key就為該對象本身,而value可以為null
}
}
輸出結果如下:
(180,3,齊彥鵬,22,180)
(175,1,李 磊,22,175)
(175,2,劉銀鵬,23,175)
(175,5,敦 鵬,20,175)
(168,4,楊 柳,22,168)
combineByKey與aggregateByKey
下面的代碼分別使用combineByKey和aggregateByKey來模擬groupByKey和reduceBykey,所以是有4個操作,只要把combineByKey模擬groupByKey的例子掌握了,其它三個相對就容易許多了。
整體來說理解不太容易,但是非常重要,所以一定是要掌握的!
/**
* spark的transformation操作:
* aggregateByKey
* combineByKey
*
* 使用combineByKey和aggregateByKey模擬groupByKey和reduceByKey
*
* 通過查看源碼,我們發現aggregateByKey底層,還是combineByKey
*
* 問題:combineByKey和aggregateByKey的區別?
* aggregateByKey是柯裏化形式的,目前底層源碼還沒時間去分析,所知道的區別是這個
*/
object _03SparkTransformationOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_03SparkTransformationOps.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
// combineByKey2GroupByKey(sc)
// combineByKey2ReduceByKey(sc)
// aggregateByKey2ReduceByKey(sc)
aggregateByKey2GroupByKey(sc)
sc.stop()
}
/**
* 使用aggregateByKey模擬groupByKey
*/
def aggregateByKey2GroupByKey(sc: SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]()) ( // 這裏需要指定value的類型為ArrayBuffer[Int]()
(part, num) => {
part.append(num)
part
},
(part1, part2) => {
part1.++=(part2)
part1
}
)
retRDD.foreach(println)
}
/**
* 使用aggregateByKey模擬reduceByKey
* def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
(zeroValue: U)就對應的是combineByKey中的第一個函數的返回值
seqOp 就對應的是combineByKey中的第二個函數,也就是mergeValue
combOp 就對應的是combineByKey中的第三個函數,也就是mergeCombiners
*/
def aggregateByKey2ReduceByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
val retRDD:RDD[(String, Int)] = pairsRDD.aggregateByKey(0) (
(partNum, num) => partNum + num, // 也就是mergeValue
(partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners
)
retRDD.foreach(println)
}
/**
* 使用reduceByKey模擬groupByKey
*/
def combineByKey2ReduceByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
/**
* 對於createCombiner1 mergeValue1 mergeCombiners1
* 代碼的參數已經體現得很清楚了,其實只要理解了combineByKey模擬groupByKey的例子,這個就非常容易了
*/
var retRDD:RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1)
retRDD.foreach(println)
}
/**
* reduceByKey操作,value就是該數值本身,則上面的數據會產生:
* (hello, 1) (bo, 1) (bo, 1)
* (zhou, 1) (xin, 1) (xin, 1)
* (hello, 1) (song, 1) (bo, 1)
* 註意有別於groupByKey的操作,它是創建一個容器
*/
def createCombiner1(num:Int):Int = {
num
}
/**
* 同一partition內,對於有相同key的,這裏的mergeValue直接將其value相加
* 註意有別於groupByKey的操作,它是添加到value到一個容器中
*/
def mergeValue1(localNum1:Int, localNum2:Int): Int = {
localNum1 + localNum2
}
/**
* 將兩個不同partition中的key相同的value值相加起來
* 註意有別於groupByKey的操作,它是合並兩個容器
*/
def mergeCombiners1(thisPartitionNum1:Int, anotherPartitionNum2:Int):Int = {
thisPartitionNum1 + anotherPartitionNum2
}
/**
* 使用combineByKey模擬groupByKey
*/
def combineByKey2GroupByKey(sc:SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
// 輸出每個partition中的map對
pairsRDD.foreachPartition( partition => {
println("<=========partition-start=========>")
partition.foreach(println)
println("<=========partition-end=========>")
})
val gbkRDD:RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
gbkRDD.foreach(println)
// 如果要測試最後groupByKey的結果是在幾個分區,可以使用下面的代碼進行測試
/*gbkRDD.foreachPartition(partition => {
println("~~~~~~~~~~~~~~~~~~~~~~~~~~~")
partition.foreach(println)
})*/
}
/**
* 初始化,將value轉變成為標準的格式數據
* 是在每個分區中進行的操作,去重後的key有幾個,就調用次,
* 因為對於每個key,其容器創建一次就ok了,之後有key相同的,只需要執行mergeValue到已經創建的容器中即可
*/
def createCombiner(num:Int):ArrayBuffer[Int] = {
println("----------createCombiner----------")
ArrayBuffer[Int](num)
}
/**
* 將key相同的value,添加到createCombiner函數創建的ArrayBuffer容器中
* 一個分區內的聚合操作,將一個分區內key相同的數據,合並
*/
def mergeValue(ab:ArrayBuffer[Int], num:Int):ArrayBuffer[Int] = {
println("----------mergeValue----------")
ab.append(num)
ab
}
/**
* 將key相同的多個value數組,進行整合
* 分區間的合並操作
*/
def mergeCombiners(ab1:ArrayBuffer[Int], ab2:ArrayBuffer[Int]):ArrayBuffer[Int] = {
println("----------mergeCombiners----------")
ab1 ++= ab2
ab1
}
}
/*
combineByKey模擬groupByKey的一個輸出效果,可以很好地說明createCombiner、mergeValue和mergeCombiners各個階段的執行時機:
<=========partition-start=========>
<=========partition-start=========>
(hello,1)
(zhou,1)
(bo,1)
(xin,1)
(bo,1)
(xin,1)
<=========partition-end=========>
(hello,1)
(song,1)
(bo,1)
<=========partition-end=========>
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeValue----------
----------mergeValue----------
----------createCombiner----------
----------createCombiner----------
----------createCombiner----------
----------mergeCombiners----------
----------mergeCombiners----------
(song,ArrayBuffer(1))
(hello,ArrayBuffer(1, 1))
(bo,ArrayBuffer(1, 1, 1))
(zhou,ArrayBuffer(1))
(xin,ArrayBuffer(1, 1))
*/
Actions算子
概述
前面Transformationt算子的測試都是在本地開發環境中直接跑代碼,這裏Actions算子的測試主要在spark-shell中進行操作,因為會方便很多。
需要說明的Actions算子如下:
-
reduce(func)
通過函數func聚集數據集中的所有元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確保可以被正確的並發執行
-
collect()
在Driver的程序中,以數組的形式,返回數據集的所有元素。這通常會在使用filter或者其它操作後,返回一個足夠小的數據子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程序OOM
-
count()
返回數據集的元素個數
-
take(n)
返回一個數組,由數據集的前n個元素組成。註意,這個操作目前並非在多個節點上,並行執行,而是Driver程序所在機器,單機計算所有的元素(Gateway的內存壓力會增大,需要謹慎使用)
-
first()
返回數據集的第一個元素(類似於take(1))
-
saveAsTextFile(path)
將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,並將它轉換為文件中的一行文本
-
saveAsSequenceFile(path)
將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,並都實現了Hadoop的Writable接口,或隱式可以轉換為Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等)
-
foreach(func)
在數據集的每一個元素上,運行函數func。這通常用於更新一個累加器變量,或者和外部存儲系統做交互
reduce
通過函數func聚集數據集中的所有元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確保可以被正確的並發執行。
關於reduce的執行過程,可以對比scala中類似的reduce函數,相關說明可以參考我的scala整理的知識點。
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29
scala> val ret = listRDD.reduce((v1, v2) => v1 + v2)
...
ret: Int = 21
需要註意的是,不同於Transformation算子,其結果仍然是RDD,但是執行Actions算子之後,其結果不再是RDD,而是一個標量。
collect
在Driver的程序中,以數組的形式,返回數據集的所有元素。這通常會在使用filter或者其它操作後,返回一個足夠小的數據子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程序OOM,這點尤其需要註意。
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
scala> val ret = listRDD.collect()
...
ret: Array[Int] = Array(1, 2, 3, 4, 5, 6)
count
返回數據集的元素個數。
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29
scala> val ret = listRDD.count()
...
ret: Long = 6
take
返回一個數組,由數據集的前n個元素組成。註意,這個操作目前並非在多個節點上,並行執行,而是Driver程序所在機器,單機計算所有的元素(Gateway的內存壓力會增大,需要謹慎使用)。
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:29
scala> listRDD.take(3)
...
res7: Array[Int] = Array(1, 2, 3)
first
返回數據集的第一個元素(類似於take(1))。
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:29
scala> listRDD.first()
...
res8: Int = 1
saveAsTextFile
將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,並將它轉換為文件中的一行文本。
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:29
scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action")
...
可以在文件系統中查看到保存的文件:
[uplooking@uplooking01 action]$ pwd
/home/uplooking/data/spark/action
[uplooking@uplooking01 action]$ ls
part-00000 part-00001 part-00002 part-00003 _SUCCESS
其實可以看到,保存的跟Hadoop的格式是一樣的。
當然因為我的spark集群中已經做了跟hadoop相關的配置,所以也可以把文件保存到hdfs中:
scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action")
...
然後就可以在hdfs中查看到保存的文件:
[uplooking@uplooking01 action]$ hdfs dfs -ls /output/spark/action
18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r-- 3 uplooking supergroup 0 2018-04-27 10:25 /output/spark/action/_SUCCESS
-rw-r--r-- 3 uplooking supergroup 2 2018-04-27 10:25 /output/spark/action/part-00000
-rw-r--r-- 3 uplooking supergroup 4 2018-04-27 10:25 /output/spark/action/part-00001
-rw-r--r-- 3 uplooking supergroup 2 2018-04-27 10:25 /output/spark/action/part-00002
-rw-r--r-- 3 uplooking supergroup 4 2018-04-27 10:25 /output/spark/action/part-00003
可以看到,保存的格式跟保存到本地文件系統是一樣的。
foreach
在數據集的每一個元素上,運行函數func。這通常用於更新一個累加器變量,或者和外部存儲系統做交互。
scala> val list = List(1, 2, 3, 4, 5, 6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val listRDD = sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:29
scala> listRDD.foreach(println)
...
saveAsNewAPIHadoopFile
也就是將數據保存到Hadoop HDFS中,但是需要註意的是,前面使用saveAsTextFile也可以進行相關操作,其使用的就是saveAsNewAPIHadoopFile或者saveAsHadoopFile這兩個API,而其兩者的區別是:
- saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的類
- saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的類。但不管使用哪一個,都是可以完成工作的。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p2
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark算子操作之Action
* saveAsNewAPIHAdoopFile
* * saveAsHadoopFile
* 和saveAsNewAPIHadoopFile的唯一區別就在於OutputFormat的不同
* saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的類
* saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的類
* 使用哪一個都可以完成工作
*
* 前面在使用saveAsTextFile時也可以保存到hadoop文件系統中,註意其源代碼也是使用上面的操作的
*
* Caused by: java.net.UnknownHostException: ns1
... 35 more
找不到ns1,因為我們在本地沒有配置,無法正常解析,就需要將hadoop的配置文件信息給我們加載進來
hdfs-site.xml.heihei,core-site.xml.heihei
*/
object _05SparkActionOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName)
val sc = new SparkContext(conf)
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val pairsRDD = listRDD.map(word => (word, 1))
val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2)
retRDD.saveAsNewAPIHadoopFile(
"hdfs://ns1/spark/action", // 保存的路徑
classOf[Text], // 相當於mr中的k3
classOf[IntWritable], // 相當於mr中的v3
classOf[TextOutputFormat[Text, IntWritable]] // 設置(k3, v3)的outputFormatClass
)
}
}
之後我們可以在hdfs中查看到相應的文件輸出:
[uplooking@uplooking01 ~]$ hdfs dfs -ls /spark/action
18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r-- 3 Administrator supergroup 0 2018-04-27 12:07 /spark/action/_SUCCESS
-rw-r--r-- 3 Administrator supergroup 13 2018-04-27 12:07 /spark/action/part-r-00000
-rw-r--r-- 3 Administrator supergroup 11 2018-04-27 12:07 /spark/action/part-r-00001
[uplooking@uplooking01 ~]$ hdfs dfs -text /spark/action/part-r-00000
18/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello 3
me 1
[uplooking@uplooking01 ~]$ hdfs dfs -text /spark/action/part-r-00001
18/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
you 1
he 1
寬依賴和窄依賴
窄依賴(narrow dependencies)
子RDD的每個分區依賴於常數個父分區(與數據規模無關)
輸入輸出一對一的算子,且結果RDD的分區結構不變。主要是map/flatmap
輸入輸出一對一的算子,但結果RDD的分區結構發生了變化,如union/coalesce
從輸入中選擇部分元素的算子,如filter、distinct、substract、sample
寬依賴(wide dependencies)
子RDD的每個分區依賴於所有的父RDD分區
對單個RDD基於key進行重組和reduce,如groupByKey,reduceByKey
對兩個RDD基於key進行join和重組,如join
經過大量shuffle生成的RDD,建議進行緩存。這樣避免失敗後重新計算帶來的開銷。
註意:reduce是一個action,和reduceByKey完全不同。
關於寬依賴和窄依賴,《Hadoop與大數據挖掘》書本上的說明非常精簡,但是理解起來也是不錯的,可以參考一下,當然,這本書的Spark內容就寫得非常少了。
Spark RDD算子實戰