spark RDD運算元大全
RDD作為spark核心的資料抽象,有關RDD的原始碼可看spark原始碼《一》RDD,有大量的api,也就是運算元
之前寫過兩篇運算元原始碼,一篇觸發runJob()的運算元,一篇是基於combineByKey()運算元的原始碼,有興趣的可以去看下。
目錄
take()&&takeOrdered()&&top()&&first()
union()&&intersection()&&subtract()
zipWithUniqueId()&&zipWithIndex()
map()&&flatMap()
返回一個新的RDD
val data=sc.parallelize(0 to 4,2) val m=data.map(x=>x to 3) m.foreach(println) println(m.count())//RDD元素數量 val fm=data.flatMap(x=>x to 3) fm.foreach(println) println(fm.count())
map結果:Range(0, 1, 2, 3),Range(1, 2, 3),Range(2, 3),Range(3),Range() RDD元素數量5
flatMap結果:0 1 2 3 1 2 3 2 3 3 RDD元素數量10
可以看到map()是將一個元素生成一個元素,而flatMap()是將一個元素生成N(也可能0個)個元素。
val data=sc.parallelize(List("tom tom","jerry hehe","hehe"),2) val m=data.map(x=>x.split(" ")) m.foreach(println) println(m.count()) val fm=data.flatMap(x=>x.split(" ")) fm.foreach(println) println(fm.count())
map結果:java.lang.String;@226d19fb,java.lang.String;@ffdbd08,java.lang.String;@3d58c9e5 元素數量3
flatMap結果:tom tom jerry hehe hehe 元素數量5
可以看到map將每個元素轉為陣列,新RDD的元素型別為陣列,而faltMap是將所有陣列中元素取出並連線起來。
map()&&mapPartitions()
返回一個新的RDD
map()與mapPartitions()原始碼:
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: T => U)
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).map(f)//分割槽中每個元素執行f方法
}
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: Iterator[T] => Iterator[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(prev.iterator(split))//每個分割槽的資料作為一個整體執行f
}
下面看示例:
val data = sc.parallelize(1 to 10,3)
data.mapPartitions(x=>mapParrtitionsF(x)).count()
data.map(x=>mapF(x)).count()
def mapF(x:Int):Int={
println("呼叫了mapF方法")
x
}
def mapParrtitionsF(x:Iterator[Int]):Iterator[Int]={
println("呼叫了mapParrtitionsF方法")
x
}
map結果:調了10次方法,RDD有多少元素執行多少次方法
flatMap結果:調了3次方法,RDD有多少分割槽執行多少次方法
mapPartitionsWithIndex()
返回一個新的RDD
val data = sc.parallelize(1 to 5,3)
data.mapPartitionsWithIndex((x,iter)=>{
var result=List[String]()
while (iter.hasNext){
result ::=(x+"-"+iter.next())
}
result.toIterator
}).foreach(println)
結果為:0-1,1-2,1-3,2-4,2-5
mapPartitionsWithIndex()傳入的方法需要兩個引數,一個為分割槽Id,另一個為分割槽資料,該方法可用來檢視各分割槽的資料
filter()
返回一個新的RDD
val data = sc.parallelize(1 to 5,3)
data.filter(x=>x%2==0).foreach(println)
結果為: 2 4
filter()接收一個返回值為布林型別的方法,過濾掉不符合條件的元素,符合條件的元素組成一個新的RDD
take()&&takeOrdered()&&top()&&first()
前三個返回一個數組,最後一個返回一個值
val data=sc.parallelize(List(6,3,2,0,11,45,1),3)
for(e<-data.take(2)){
println(e)
}
for(e<-data.takeOrdered(2)){
println(e)
}
for(e<-data.top(2)){
println(e)
}
println(data.first())
take(2)結果為:6 3 ,取前兩個元素
takeOrdered(2)結果為:0 1 ,升序排列後,取前兩元素
top(2)結果為:45 11, 降序排列後,取前兩元素
first()結果為:6 取第一個元素
sample()&&takeSample()
一個返回新的RDD 一個返回陣列
val data = sc.parallelize(1 to 10000,3)
println(data.sample(true,0.5).count())
println(data.takeSample(true,5000).length)
sample()結果為:4976
takeSample()結果為:5000
這兩運算元都是隨機抽取元素,可以傳三個引數,第一個為布林值,表示是否重複抽取元素,第二個sample()為抽取比例,takeSample()為具體數值,第三個為隨機種子,java的隨機數是偽隨機數,通過計算隨機種子得到某些數,可用預設值。
從結果可以看出,抽取比例為0.5時是抽不到5000個元素的,如果想抽取具體多少元素,可用takeSample()
union()&&intersection()&&subtract()
返回一個新的RDD
val d4=sc.parallelize(List(1,3,4,6,3,8,1),2)
val d5=sc.parallelize(List(2,3,4,5,3,8,9),3)
d4.union(d5).foreach(println)
println(d4.union(d5).partitions.length)//新RDD分割槽數
d4.intersection(d5).foreach(println)
println(d4.intersection(d5).partitions.length)
println(d4.intersection(d5,6).partitions.length)//指定新RDD分割槽數
d4.subtract(d5).foreach(println)
println(d5.subtract(d4).partitions.length)
println(d4.subtract(d5).partitions.length)
union()結果為:1,3,4,6,3,8,1,2,3,4,5,3,8,9 分割槽數為2+3=5
intersection()結果為:8,3,4 分割槽數為:3 6
subtract()結果為:1,6,1 分割槽數為:3 2
這三個運算元分別求:並集,交集,差集。其中intersection()交集去重,別的union(),subtract()均不去重,union()分割槽數為兩個RDD分割槽數之和,intersection()分割槽預設為max(rdd1分割槽,rdd2分割槽),可以手動指定分割槽數,subtract()預設分割槽數為調該方法的RDD的分割槽數,也可以指定。
reduce()&&fold()
返回一個值
val data = sc.parallelize(1 to 5,3)
println(data.reduce(_+_))
println(data.fold(5)(_+_))
reduce()結果為:15
fold()結果為:35
關於reduce()與fold()原始碼可看我之前的部落格。
reduce()的計算過程:(((1+2)+3)+4)+5,而fold()的計算過程是先計算各分割槽與初值的結果,最後計算各分割槽結果與初值,
假設data的資料分佈是:(1),(2,3),(4,5)
則fold()的計算結果是:(1+5)+(2+3+5)+(4+5+5)+5,
所以reduce()與fold()的區別就是一個有初值,一個無初值。
fold()&&aggregate()
返回一個值
val data = sc.parallelize(1 to 5,3)
println(data.fold(5)(_+_))
println(data.aggregate(5)(_+_,_*_))
fold()結果為:35
aggregate()結果為:4200
aggregate()需要傳入兩個方法,第一個方法計算各分割槽內資料,第二個方法計算各分割槽之前結果資料。
aggregate()計算過程:(1+5)*(2+3+5)*(4+5+5)*5=4200
而fold()傳一個方法,計算各分割槽內資料不僅用這個方法,計算各分割槽之前結果資料也用該方法。
所以fold()與aggregate()區別是:一個傳一個方法,一個傳兩個方法。
zipWithUniqueId()&&zipWithIndex()
返回一個新的RDD
val data = sc.parallelize(1 to 5,3)
data.zipWithUniqueId().foreach(println)
data.zipWithIndex().foreach(println)
zipWithUniqueId()結果為:(2,1),(3,4),(4,2),(5,5),(1,0)
zipWithIndex()結果為:(4,3),(2,1),(3,2),(5,4),(1,0)
可以看出,這兩種方法都是返回一個鍵值對的RDD,鍵為元素,zipWithIndex()值為下標,從0到RDD元素數-1,而zipWithUniqueId()值為唯一的Id,不受最大值為RDD元素數-1的約束
zip()&&zipPartitions()
返回一個新的RDD
val d1=sc.parallelize(List("tom","jerry","hehe","zlq","wnn","nm","sl"),2)
val d3=sc.parallelize(List(6,3,2,0,11,45,1),2)
d1.zip(d3).foreach(println)
d1.zipPartitions(d3)((iter1,iter2)=>{//方法的引數為迭代器
var result=List[String]()
while (iter1.hasNext&&iter2.hasNext){
result ::=(iter1.next()+"-"+iter2.next())
}
result.iterator
}).foreach(println)
zip()結果為:(zlq,0),(tom,6),(wnn,11),(nm,45),(sl,1),(jerry,3),(hehe,2)
zipPartitions()結果為:hehe-2,jerry-3,tom-6,sl-1,nm-45,wnn-11,zlq-0
zip()要求兩個RDD分割槽數與元素數必須相等,而zipPartitions()只要求分割槽數相同。生成新RDD與原RDD分割槽數相同
未完待續........