1. 程式人生 > >spark RDD運算元大全

spark RDD運算元大全

RDD作為spark核心的資料抽象,有關RDD的原始碼可看spark原始碼《一》RDD,有大量的api,也就是運算元

之前寫過兩篇運算元原始碼,一篇觸發runJob()的運算元,一篇是基於combineByKey()運算元的原始碼,有興趣的可以去看下。

目錄

map()&&flatMap()

map()&&mapPartitions()

mapPartitionsWithIndex()

filter()

take()&&takeOrdered()&&top()&&first()

sample()&&takeSample()

union()&&intersection()&&subtract()

reduce()&&fold()

fold()&&aggregate()

zipWithUniqueId()&&zipWithIndex()

zip()&&zipPartitions()


map()&&flatMap()

返回一個新的RDD

    val data=sc.parallelize(0 to 4,2)

    val m=data.map(x=>x to 3)
    m.foreach(println)
    println(m.count())//RDD元素數量

    val fm=data.flatMap(x=>x to 3)
    fm.foreach(println)
    println(fm.count())

map結果:Range(0, 1, 2, 3),Range(1, 2, 3),Range(2, 3),Range(3),Range()     RDD元素數量5

flatMap結果:0 1 2 3 1 2 3 2 3 3     RDD元素數量10

可以看到map()是將一個元素生成一個元素,而flatMap()是將一個元素生成N(也可能0個)個元素。

    val data=sc.parallelize(List("tom tom","jerry hehe","hehe"),2)
    
    val m=data.map(x=>x.split(" "))
    m.foreach(println)
    println(m.count())

    val fm=data.flatMap(x=>x.split(" "))
    fm.foreach(println)
    println(fm.count())

map結果:java.lang.String;@226d19fb,java.lang.String;@ffdbd08,java.lang.String;@3d58c9e5 元素數量3

flatMap結果:tom  tom  jerry  hehe  hehe  元素數量5

可以看到map將每個元素轉為陣列,新RDD的元素型別為陣列,而faltMap是將所有陣列中元素取出並連線起來。

map()&&mapPartitions()

返回一個新的RDD

map()與mapPartitions()原始碼:

class MappedRDD[U: ClassManifest, T: ClassManifest](
    prev: RDD[T],
    f: T => U)
  extends RDD[U](prev.context) {
  
  override def splits = prev.splits
  override val dependencies = List(new OneToOneDependency(prev))
  override def compute(split: Split) = prev.iterator(split).map(f)//分割槽中每個元素執行f方法
}


class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
    prev: RDD[T],
    f: Iterator[T] => Iterator[U])
  extends RDD[U](prev.context) {
  
  override def splits = prev.splits
  override val dependencies = List(new OneToOneDependency(prev))
  override def compute(split: Split) = f(prev.iterator(split))//每個分割槽的資料作為一個整體執行f
}

下面看示例:

    val data = sc.parallelize(1 to 10,3)
    
    data.mapPartitions(x=>mapParrtitionsF(x)).count()
    data.map(x=>mapF(x)).count()

    def mapF(x:Int):Int={
      println("呼叫了mapF方法")
      x
    }
    
    def mapParrtitionsF(x:Iterator[Int]):Iterator[Int]={
      println("呼叫了mapParrtitionsF方法")
      x
    }

map結果:調了10次方法,RDD有多少元素執行多少次方法

flatMap結果:調了3次方法,RDD有多少分割槽執行多少次方法

mapPartitionsWithIndex()

返回一個新的RDD

    val data = sc.parallelize(1 to 5,3)

    data.mapPartitionsWithIndex((x,iter)=>{
          var result=List[String]()
          while (iter.hasNext){
            result ::=(x+"-"+iter.next())
          }
          result.toIterator
       }).foreach(println)

結果為:0-1,1-2,1-3,2-4,2-5

mapPartitionsWithIndex()傳入的方法需要兩個引數,一個為分割槽Id,另一個為分割槽資料,該方法可用來檢視各分割槽的資料

filter()

返回一個新的RDD

    val data = sc.parallelize(1 to 5,3)
    
    data.filter(x=>x%2==0).foreach(println)

結果為:  2  4

filter()接收一個返回值為布林型別的方法,過濾掉不符合條件的元素,符合條件的元素組成一個新的RDD

take()&&takeOrdered()&&top()&&first()

前三個返回一個數組,最後一個返回一個值

    val data=sc.parallelize(List(6,3,2,0,11,45,1),3)
    
    for(e<-data.take(2)){
      println(e)
    }
    
    for(e<-data.takeOrdered(2)){
      println(e)
    }
    
    for(e<-data.top(2)){
      println(e)
    }
    
    println(data.first())

take(2)結果為:6  3  ,取前兩個元素

takeOrdered(2)結果為:0  1  ,升序排列後,取前兩元素

top(2)結果為:45  11,  降序排列後,取前兩元素

first()結果為:6   取第一個元素

sample()&&takeSample()

一個返回新的RDD  一個返回陣列

    val data = sc.parallelize(1 to 10000,3)
    
    println(data.sample(true,0.5).count())
    println(data.takeSample(true,5000).length)

sample()結果為:4976

takeSample()結果為:5000

這兩運算元都是隨機抽取元素,可以傳三個引數,第一個為布林值,表示是否重複抽取元素,第二個sample()為抽取比例,takeSample()為具體數值,第三個為隨機種子,java的隨機數是偽隨機數,通過計算隨機種子得到某些數,可用預設值。

從結果可以看出,抽取比例為0.5時是抽不到5000個元素的,如果想抽取具體多少元素,可用takeSample()

union()&&intersection()&&subtract()

返回一個新的RDD

    val d4=sc.parallelize(List(1,3,4,6,3,8,1),2)
    val d5=sc.parallelize(List(2,3,4,5,3,8,9),3)

    d4.union(d5).foreach(println)
    println(d4.union(d5).partitions.length)//新RDD分割槽數

    d4.intersection(d5).foreach(println)
    println(d4.intersection(d5).partitions.length)
    println(d4.intersection(d5,6).partitions.length)//指定新RDD分割槽數

    d4.subtract(d5).foreach(println)
    println(d5.subtract(d4).partitions.length)
    println(d4.subtract(d5).partitions.length)

union()結果為:1,3,4,6,3,8,1,2,3,4,5,3,8,9   分割槽數為2+3=5 

intersection()結果為:8,3,4  分割槽數為:3     6

subtract()結果為:1,6,1   分割槽數為:3    2

這三個運算元分別求:並集,交集,差集。其中intersection()交集去重,別的union(),subtract()均不去重,union()分割槽數為兩個RDD分割槽數之和,intersection()分割槽預設為max(rdd1分割槽,rdd2分割槽),可以手動指定分割槽數,subtract()預設分割槽數為調該方法的RDD的分割槽數,也可以指定。

reduce()&&fold()

返回一個值

    val data = sc.parallelize(1 to 5,3)

    println(data.reduce(_+_))
    println(data.fold(5)(_+_))

reduce()結果為:15

fold()結果為:35

關於reduce()與fold()原始碼可看我之前的部落格。

reduce()的計算過程:(((1+2)+3)+4)+5,而fold()的計算過程是先計算各分割槽與初值的結果,最後計算各分割槽結果與初值,

假設data的資料分佈是:(1),(2,3),(4,5)

則fold()的計算結果是:(1+5)+(2+3+5)+(4+5+5)+5,

所以reduce()與fold()的區別就是一個有初值,一個無初值。

fold()&&aggregate()

返回一個值

    val data = sc.parallelize(1 to 5,3)
    
    println(data.fold(5)(_+_))
    println(data.aggregate(5)(_+_,_*_))

fold()結果為:35

aggregate()結果為:4200

aggregate()需要傳入兩個方法,第一個方法計算各分割槽內資料,第二個方法計算各分割槽之前結果資料。

aggregate()計算過程:(1+5)*(2+3+5)*(4+5+5)*5=4200

而fold()傳一個方法,計算各分割槽內資料不僅用這個方法,計算各分割槽之前結果資料也用該方法。

所以fold()與aggregate()區別是:一個傳一個方法,一個傳兩個方法。

zipWithUniqueId()&&zipWithIndex()

返回一個新的RDD

    val data = sc.parallelize(1 to 5,3)
    
    data.zipWithUniqueId().foreach(println)
    data.zipWithIndex().foreach(println)

zipWithUniqueId()結果為:(2,1),(3,4),(4,2),(5,5),(1,0)

zipWithIndex()結果為:(4,3),(2,1),(3,2),(5,4),(1,0)

可以看出,這兩種方法都是返回一個鍵值對的RDD,鍵為元素,zipWithIndex()值為下標,從0到RDD元素數-1,而zipWithUniqueId()值為唯一的Id,不受最大值為RDD元素數-1的約束

zip()&&zipPartitions()

返回一個新的RDD

    val d1=sc.parallelize(List("tom","jerry","hehe","zlq","wnn","nm","sl"),2)
    val d3=sc.parallelize(List(6,3,2,0,11,45,1),2)

    d1.zip(d3).foreach(println)
    d1.zipPartitions(d3)((iter1,iter2)=>{//方法的引數為迭代器
      var result=List[String]()
      while (iter1.hasNext&&iter2.hasNext){
        result ::=(iter1.next()+"-"+iter2.next())
      }
      result.iterator
    }).foreach(println)

zip()結果為:(zlq,0),(tom,6),(wnn,11),(nm,45),(sl,1),(jerry,3),(hehe,2)

zipPartitions()結果為:hehe-2,jerry-3,tom-6,sl-1,nm-45,wnn-11,zlq-0

zip()要求兩個RDD分割槽數與元素數必須相等,而zipPartitions()只要求分割槽數相同。生成新RDD與原RDD分割槽數相同

 

未完待續........