1. 程式人生 > >RDD運算元原始碼《一》返回一個結果的actions運算元

RDD運算元原始碼《一》返回一個結果的actions運算元

返回一個結果的actions運算元

目錄

 

foreach()

collect()

reduce()

fold()

aggregate()

count()

toArray()

take()

first()


foreach()

def foreach(f: T => Unit) {
    val cleanF = sc.clean(f)
    //提交job,迭代器每個元素執行傳入方法
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

iter.foreach是無返回值,一般用於除錯列印RDD中的資料

    val a=sc.parallelize(1 to 5,2)
    a.foreach(print)

將列印12345

collect()

def collect(): Array[T] = {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    //將各分割槽資料連線起來
    Array.concat(results: _*)
  }

collect()方法將RDD中的資料轉化為陣列

    val a=sc.parallelize(1 to 5,2)
    println(a.collect()(1))//列印下標為1的值

將列印 2

reduce()

    def reduce(f: (T, T) => T): T = {
    val cleanF = sc.clean(f)

    //如果迭代器有值,呼叫reduceLeft()
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      }else {
        None
      }
    }
    
    //提交job,runJob返回陣列
    val options = sc.runJob(this, reducePartition)
    val results = new ArrayBuffer[T]
    
    //遍歷返回的結果,存入陣列中
    for (opt <- options; elem <- opt) {
      results += elem
    }
    
    
    if (results.size == 0) {
      throw new UnsupportedOperationException("empty collection")
    } else {
      //陣列繼續呼叫reduceLeft()方法,將結果返回
      return results.reduceLeft(cleanF)
    }
  }

我們知道,提交task的時候,一個RDD的分割槽資料對應一個迭代器,一個迭代器對應一個task,

所以,runJob()返回來的陣列存放的是各分割槽的結果。

reduce()主要分兩步,第一步呼叫reduceLeft()計算各個分割槽資料,第二步,再呼叫reduceLeft()計算各分割槽結果,返回。

接下來我們說下Iterator的reduceLeft()方法

    val w=(1 to 5).toIterator
    println(w.reduceLeft(_+_))  //15
    println(w.reduceLeft(_-_))  //-13
    println(w.reduceRight(_-_)) //3

reduceLeft()計算的是從左邊開始相鄰兩個元素的值,將結果與後面元素再計算,直到所有元素參與計算

第一個輸出:(((1+2)+3)+4)+5=15

第二個輸出:(((1-2)-3)-4)-5=-13

第三個輸出:從右邊開始,1-(2-(3-(4-5)))=3

fold()

  def fold(zeroValue: T)(op: (T, T) => T): T = {
    val cleanOp = sc.clean(op)
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp))
    return results.fold(zeroValue)(cleanOp)
  }

fold跟reduce是差不多,只不過fold有初值,先計算各個分割槽與初值的結果,存入陣列,再計算結果與初值的值。

可以得出當有1個分割槽時,初值被計算2次,第一次與分割槽資料,第二次與分割槽結果資料。

val a=sc.parallelize(List(1,2,3,4,5),1)
val b=sc.parallelize(List(1,2,3,4,5),2)
val c=sc.parallelize(List(1,2,3,4,5),3)

println(a.fold(10)(_+_))
println(b.fold(10)(_+_))
println(c.fold(10)(_+_))

a的結果為:1+2+3+4+5+10=25,  25+10=35

b的結果為:1+2+3+10=16,4+5+10=19,16+19+10=45   (分割槽資料編的,與真實情況可能有區別)

c的結果為:1+2+10=13,3+4+10=17,5+10=15,13+17+15=55

aggregate()

   def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U):     U = {
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val results = sc.runJob(this,
        (iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp))
    return results.fold(zeroValue)(cleanCombOp)
  }

aggregate()先用seqOp方法計算各分割槽裡的資料,將各分割槽結果存入陣列,然後再呼叫fold(),方法為combOp計算結果資料。

可以看到aggregate()與fold()的區別在於:flod計算某個分割槽資料與最後計算各分割槽結果資料用的用同一個方法,而aggregate()

則可以用兩種不同的方法,當aggregate()兩個方法一樣時,結果與fold()是一樣的。

 

    val a=sc.parallelize(List(1,2,3,4),2)
    println(a.aggregate(5)(_+_,_+_))
    println(a.fold(5)(_+_))

結果都為 25,計算步驟可參考上面的fold()方法

 

    val a=sc.parallelize(List(1,2,3,4),2)
    println(a.aggregate(5)(_+_,_*_))

結果為 480

計算步驟:假設兩個分割槽資料為(1,2)(3,4)

分割槽1結果:1+2+5=8   分割槽2結果:3+4+5=12

最終結果:呼叫第二個方法計算,8*12*5=480

count()

  def count(): Long = {
    sc.runJob(this, (iter: Iterator[T]) => {
      var result = 0L
      while (iter.hasNext) {//遍歷迭代器中所有元素
        result += 1L
        iter.next
      }
      result
    }).sum
  }

count()用來求RDD中元素的個數,求出個分割槽元素個數,然後sum()求和

    val w=sc.parallelize(0 to 10,3).count()
    println(w)

結果為為 11

toArray()

def toArray(): Array[T] = collect()

呼叫collect()方法,將RDD轉為陣列

take()

   def take(num: Int): Array[T] = {
    if (num == 0) {
      return new Array[T](0)
    }
    val buf = new ArrayBuffer[T]//存放取出的元素的陣列
    var p = 0
    while (buf.size < num && p < splits.size) {//迴圈,直到數取夠或者迴圈完所有分割槽
      val left = num - buf.size//得到還需要從取出的元素數
      //true為允許job本節點執行,無需發往叢集
      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
      buf ++= res(0)//將從第一個分割槽取到的資料存入陣列
      if (buf.size == num)//若已取夠,返回,無的話繼續取後面的分割槽
        return buf.toArray
      p += 1
    }
    return buf.toArray
  }

該方法為取出RDD的n個元素,先從一個分割槽中取資料,取夠的話,直接返回,沒夠的話,繼續取別的分割槽。

    val w=sc.parallelize(0 to 100,2)
    for(e<-w.take(10)){
      print(e)
    }

結果為:0123456789

first()

  def first(): T = take(1) match {
    case Array(t) => t
    case _ => throw new UnsupportedOperationException("empty collection")
  }

該方法為取第一個元素,呼叫take(1),沒取到的話拋異常。