1. 程式人生 > >spark Transformation 運算元

spark Transformation 運算元

map(func)

通過函式func傳遞源的每個元素來形成一個新的分散式資料集

val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr.map(x=>(x._1+x._2)).foreach(println)
print:   
A1
B2
C3

mapValues

mapValues顧名思義就是輸入函式應用於RDD中Kev-Value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函式只適用於元素為KV對的RDD。

scala> val a = sc.parallelize
(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2) scala> val b = a.map(x => (x.length, x)) scala> b.mapValues("x" + _ + "x").collect res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

mapWith

mapWith是map的另外一個變種,map只需要一個輸入函式,而mapWith有兩個輸入函式。它的定義如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
第一個函式constructA是把RDD的partition index(index從0開始)作為輸入,輸出為新型別A;
第二個函式f是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函式的輸出),輸出型別為U。
舉例:把partition index 乘以10,然後加上2作為新的RDD的元素。

val x = sc.parallelize(List(1
,2,3,4,5,6,7,8,9,10), 3) x.mapWith(a => a * 10)((a, b) => (b + 2)).collect res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

flatMap(func):

與map類似,但每個元素輸入項都可以被對映到0個或多個的輸出項,最終將結果”扁平化“後輸出

val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr.flatmap(x=>(x._1+x._2)).foreach(println)
print:
A
1
B
2
C
3

flatMapWith

flatMapWith與mapWith很類似,都是接收兩個函式,一個函式把partitionIndex作為輸入,輸出是一個新型別A;另外一個函式是以二元組(T,A)作為輸入,輸出為一個序列,這些序列裡面的元素組成了新的RDD。它的定義如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]


scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)

flatMapValues

flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為KV對的RDD中Value。每個一元素的Value被輸入函式對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。


scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val b = a.flatMapValues(x=>x.to(5))
scala> b.collect
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每個元素的值被轉換為一個序列(從其當前值到5),比如第一個KV對(1,2), 其值2被轉換為2,3,4,5。然後其再與原KV對中Key組成一系列新的KV對(1,2),(1,3),(1,4),(1,5)。

mapPartitions(func):

將map函式作用與每個分割槽,函式型別為:Iterator[T] => Iterator[U]
一般在資料庫操作中用到


var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分割槽
scala> var rdd3 = rdd1.mapPartitions{ x => {
     | var result = List[Int]()
     |     var i = 0
     |     while(x.hasNext){
     |       i += x.next()
     |     }
     |     result.::(i).iterator  //要求輸入和返回都Iterator[U]型別,故將i轉換為iterator
     | }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23

//rdd3將rdd1中每個分割槽中的數值累加
scala> rdd3.collect
res65: Array[Int] = Array(3, 12)
scala> rdd3.partitions.size
res66: Int = 2

mapPartitionsWithIndex(func):

類似於mapPartitions,而且還提供FUNC與表示所述分割槽的索引的整數值,所以FUNC必須是型別(Int, Iterator[T]) => Iterator[U]

var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分割槽
var rdd2 = rdd1.mapPartitionsWithIndex{
        (x,iter) => {
          var result = List[String]()
            var i = 0
            while(iter.hasNext){
              i += iter.next()
            }
            result.::(x + "|" + i).iterator

        }
      }
//rdd2將rdd1中每個分割槽的數字累加,並在每個分割槽的累加結果前面加了分割槽索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)

filter(func):

增加過濾器,針對一維的資料

val rdd = sc.parallelize(1 to 5)
rdd.filter(_>3).collect().foreach(println)
4
5

sample(withReplacement,fraction,seed):

指定的隨機種子隨機抽樣
withReplacement:抽出的資料是否放回,true為有放回的抽樣,false為無放回的抽樣
fraction :抽樣的數量
seed:隨機種子

val rdd = sc.parallelize(1 to 10)
val sample1 = rdd.sample(true,0.5,3)
sample1.collect.foreach(println)
9

union(ortherDataset):

將兩個Rdd資料集合並返回

val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.union(rdd2)
unionRDD.collect.foreach(println)
輸出:
1
2
3
3
4
5

intersection(otherDataset):

返回兩個Rdd的交集

val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.intersection(rdd2)
unionRDD.collect.foreach(println)
輸出:

3

distinct([ numTasks ])

對RDD資料集進行去重

val list = List(1,1,2,5,2,9,6,1)
val distinctRDD = sc.parallelize(list)
val unionRDD = distinctRDD.distinct()
unionRDD.collect.foreach(println)
1
9
5
6
2

groupByKey([ numTasks ])

當呼叫(K,V)對的資料集時,返回(K,Iterable )對的資料集。

一般優先選擇reduceByKey,combineByKey,foldByKey

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum)).collect().foreach(println)
輸出:
(two,2)
(one,1)
(three,3)

reduceByKey(func,[ numTasks ])

當呼叫(K,V)對的資料集時,返回(K,V)對的資料集,其中每個鍵的值使用給定的reduce函式func進行聚合,函式func必須是(V,V)=> V.像in一樣groupByKey,reduce任務的數量可以通過可選的第二個引數來配置。

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect().foreach(println)
輸出:
(two,2)
(one,1)
(three,3)  

aggregate(zeroValue,seq,comb,taskNums)

將初始值和第一個分割槽中的第一個元素傳遞給seq函式進行計算,然後將計算結果和第二個元素傳遞給seq函式,直到計算到最後一個值。第二個分割槽中也是同理操作。最後將初始值、所有分割槽的結果經過combine函式進行計算(先將前兩個結果進行計算,將返回結果和下一個結果傳給combine函式,以此類推),並返回最終結果。

>>> data = sc.parallelize((1,2,3,4,5,6),2)
>>> def seq(a,b):
...     print 'seqOp:'+str(a)+"\t"+str(b)
...     return min(a,b)
... 
>>> def combine(a,b):
...     print 'comOp:'+str(a)+"\t"+str(b)
...     return a+b
... 
>>> data.aggregate(3,seq,combine)
seqOp:3  1
seqOp:1  2
seqOp:1  3
seqOp:3  4
seqOp:3  5
seqOp:3  6
comOp:3  1
comOp:4  3
7
  def main(args: Array[String]) {

    System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.4\\hadoop-2.6.4")

    val conf = new SparkConf().setAppName("Join").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3)),2)

    def seq(a:Int, b:Int): Int = {
      println("seq: " + a + "\t " + b)
      math.max(a, b)
    }

    def comb(a:Int,b:Int):Int={
      println("comb : "+a+"\t"+b)
      a+b
    }
    val result = data.aggregateByKey(0)(seq,comb)  //.collect()
    result.collect().foreach(println)

    print("==="+data.partitions.size)


  }

從上面的程式碼的輸出結果可以看出,1,2,3被分到一個分割槽中,4,5,6被分到一個分割槽中。3先和第一個元素1傳給seq函式,返回最小值1,然後將1和第二個元素2傳給seq函式,返回1,以此類推,最後返回第一個分割槽中的最小值1。第二個分割槽一樣道理,最後結果返回最小值3.最後將初始值3和兩個分割槽的結果經過combine函式進行計算,先將初始值3和第一個分割槽的結果1傳給combine函式,返回4,然後將4和第二個分割槽結果3傳給combine函式,返回最終結果7。

簡單說就是先分割槽中使用seq(都帶入初始值),在不同分割槽 使用comb函式(也帶入初始值)

aggregateByKey(zeroValue)(seqOp,combOp,[ numTasks ])

當呼叫(K,V)對的資料集時,返回(K,U)對的資料集,其中使用給定的組合函式和中性的“零”值來彙總每個鍵的值。允許與輸入值型別不同的聚合值型別,同時避免不必要的分配。

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  

object AggregateByKeyOp {  
  def main(args:Array[String]){  
     val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")  
    val sc: SparkContext = new SparkContext(sparkConf)  

     val data=List((1,3),(1,2),(1,4),(2,3))  
     val rdd=sc.parallelize(data, 2)  

     //合併不同partition中的值,a,b得資料型別為zeroValue的資料型別  
     def combOp(a:String,b:String):String={  
       println("combOp: "+a+"\t"+b)  
       a+b  
     }  
     //合併在同一個partition中的值,a的資料型別為zeroValue的資料型別,b的資料型別為原value的資料型別  
      def seqOp(a:String,b:Int):String={  
        println("SeqOp:"+a+"\t"+b)  
        a+b  
      }  
      rdd.foreach(println)  
      //zeroValue:中立值,定義返回value的型別,並參與運算  
      //seqOp:用來在同一個partition中合併值  
      //combOp:用來在不同partiton中合併值  
      val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)  
      sc.stop()  
  }  
}

將資料拆分成兩個分割槽

//分割槽一資料
(1,3)
(1,2)
//分割槽二資料
(1,4)
(2,3)

//分割槽一相同key的資料進行合併
seq: 100     3   //(1,3)開始和中立值進行合併  合併結果為 1003
seq: 1003     2   //(1,2)再次合併 結果為 10032

//分割槽二相同key的資料進行合併
seq: 100     4  //(1,4) 開始和中立值進行合併 1004
seq: 100     3  //(2,3) 開始和中立值進行合併 1003

將兩個分割槽的結果進行合併
//key為2的,只在一個分割槽存在,不需要合併 (2,1003)
(2,1003)

//key為1的, 在兩個分割槽存在,並且資料型別一致,合併
comb: 10032     1004
(1,100321004)
  def main(args: Array[String]) {

    System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.4\\hadoop-2.6.4")

    val conf = new SparkConf().setAppName("Join").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List(1,2,3,4,5,6),2)
        def seq(a:Int, b:Int): Int = {
          println("seq: " + a + "\t " + b)
          math.min(a, b)
        }

        def comb(a:Int,b:Int):Int={
          println("comb : "+a+"\t"+b)
          a+b
       }

    println("====="+data.aggregate(3)(seq,comb))

  }

簡單說就是先分割槽中不同的key使用seq(不同key都帶入初始值),在不同分割槽的key 使用comb函式(不帶初始值)

sortByKey([ascending], [numTasks])

當呼叫K實現Ordered的(K,V)對的資料集時,按照布林ascending引數中的指定,按照升序或降序返回按鍵排序的(K,V)對的資料集。

val d2 = sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",104),("cc",1),("ff",13),("gg",68),("bb",44)))  
d2.sortByKey(false).collect  

輸出:
res60: Array[(String, Int)] = Array((gg,68), (ff,13), (ee,104), (dd,16), (cc,32), (cc,22), (cc,1), (bb,32), (bb,6), (bb,44), (aa,18))

join(otherDataset, [numTasks])

當(K,V)和(K,W)型別的資料集被呼叫時,返回(K,(V,W))對的每個鍵的所有元素對的資料集。外連線通過支援leftOuterJoin,rightOuterJoin和fullOuterJoin。

val a = sc.parallelize(Array(("123",4.0),("456",9.0),("789",9.0)) 
val b = sc.parallelize(Array(("123",8.0),("789",10)))
val c = a.join(b)
c.foreach(println)
/*
(123,(4.0,8.0))
(789,(9.0,10))
 */

cogroup(otherDataset, [numTasks])

當(K,V)和(K,W)型別的資料集被呼叫時,返回(K,(Iterable ,Iterable ))元組的資料集。這個操作也被稱為groupWith。

val a = sc.parallelize(Array(("123",4.0),("456",9.0),("789",9.0)) 
val b = sc.parallelize(Array(("123",8.0),("789",10)))
val d = a.cogroup(b)
d.foreach(println)
/*
(456,(CompactBuffer(9.0),CompactBuffer()))
(123,(CompactBuffer(4.0),CompactBuffer(8.0)))
(789,(CompactBuffer(9.0),CompactBuffer(10)))
*/

pipe(command, [envVars])

通過shell命令管理RDD的每個分割槽,例如Perl或bash指令碼。RDD元素被寫入程序的stdin,輸出到stdout的行被作為字串的RDD返回。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object PipeTest {

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("pipe Test")
    val sc = new SparkContext(sparkConf)

    val data1 = Array[(String, Int)](
      ("K1", 1), ("K2", 2),
      ("U1", 3), ("U2", 4),
      ("W1", 3), ("W2", 4), )
    val pairs = sc.parallelize(data1, 3)
    val finalRDD = pairs.pipe("grep 2")
    finalRDD.foreach(x => println("!!!!!!!! " + x + " !!!!!!!!"))//輸出
    //(K2,2)
    //(U2,4)
    //(W2,4)
    sc.stop()
  }

}

cartesian(otherDataset):

對兩Rdd進笛卡爾積操作

val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRDD = rdd1.cartesian(rdd2)
cartesianRDD.collect.foreach(println)
(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)

coalesce(numPartitions,shuffle):

對RDD的分割槽進行重新分割槽,shuffle預設值為false,當shuffle=false時,不能增加分割槽數
目,但不會報錯,只是分割槽個數還是原來的

val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(3) //當suffle的值為false時,不能增加分割槽數(即分割槽數不能從5->7)
println("重新分割槽後的分割槽個數:"+coalesceRDD.partitions.size) 
輸出
重新分割槽後的分割槽個數:3

repartition(numPartition):

是函式coalesce(numPartition,true)的實現

glom():

將RDD的每個分割槽中的型別為T的元素轉換換陣列Array[T]

val rdd = sc.parallelize(1 to 16,4)
val glomRDD = rdd.glom() //RDD[Array[T]]
glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName))

randomSplit(weight:Array[Double],seed):

val rdd = sc.parallelize(1 to 10)
val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0))

scala> randomSplitRDD(0).foreach(x => print(x +" "))
為空
scala> randomSplitRDD(1).foreach(x => print(x +" "))
8
scala> randomSplitRDD(2).foreach(x => print(x +" "))
9 3 1 6 2 4 10 5 7

repartitionAndSortWithinPartitions(partitioner)

根據給定的分割槽器對RDD進行重新分割槽,並在每個結果分割槽中按鍵分類記錄。這比repartition在每個分割槽中呼叫然後排序更高效,因為它可以將排序推送到洗牌機器中。

repartitionAndSortWithinPartitions,官方建議,如果需要在repartition重分割槽之後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions運算元。因為該運算元可以一邊進行重分割槽的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,效能可能是要高的。

import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by sunxufeng on 2016/6/18.
 */
class Student {

}


//建立key類,key組合鍵為grade,score
case class StudentKey(grade:String,score:Int)
//  extends Ordered[StudentKey]{
//  def compare(that: StudentKey) : Int = {
//    var result:Int = this.grade.compareTo(that.grade)
//    if (result == 0){
//      result = this.student.compareTo(that.student)
//      if(result ==0){
//        result = that.score.compareTo(this.score)
//      }
//    }
//    result
//  }
//}

object StudentKey {
  implicit def orderingByGradeStudentScore[A <: StudentKey] : Ordering[A] = {
//    Ordering.by(fk => (fk.grade, fk.student, fk.score * -1))
    Ordering.by(fk => (fk.grade, fk.score * -1))
  }
}

object Student{
  def main(args: Array[String]) {



    //定義hdfs檔案索引值
    val grade_idx:Int=0
    val student_idx:Int=1
    val course_idx:Int=2
    val score_idx:Int=3

    //定義轉化函式,不能轉化為Int型別的,給預設值0
    def safeInt(s: String): Int = try { s.toInt } catch { case _: Throwable  => 0 }

    //定義提取key的函式
    def createKey(data: Array[String]):StudentKey={
      StudentKey(data(grade_idx),safeInt(data(score_idx)))
    }

    //定義提取value的函式
    def listData(data: Array[String]):List[String]={
      List(data(grade_idx),data(student_idx),data(course_idx),data(score_idx))
    }

    def createKeyValueTuple(data: Array[String]) :(StudentKey,List[String]) = {
      (createKey(data),listData(data))
    }

    //建立分割槽類
    import org.apache.spark.Partitioner
    class StudentPartitioner(partitions: Int) extends Partitioner {
      require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

      override def numPartitions: Int = partitions

      override def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[StudentKey]
        k.grade.hashCode() % numPartitions
      }
    }

    //設定master為local,用來進行本地除錯
    val conf = new SparkConf().setAppName("Student_partition_sort").setMaster("local")
    val sc = new SparkContext(conf)

    //學生資訊是打亂的
    val student_array =Array(
      "c001,n003,chinese,59",
      "c002,n004,english,79",
      "c002,n004,chinese,13",
      "c001,n001,english,88",
      "c001,n002,chinese,10",
      "c002,n006,chinese,29",
      "c001,n001,chinese,54",
      "c001,n002,english,32",
      "c001,n003,english,43",
      "c002,n005,english,80",
      "c002,n005,chinese,48",
      "c002,n006,english,69"
      )
   //將學生資訊並行化為rdd
    val student_rdd = sc.parallelize(student_array)
   //生成key-value格式的rdd
    val student_rdd2 = student_rdd.map(line => line.split(",")).map(createKeyValueTuple)
    //根據StudentKey中的grade進行分割槽,並根據score降序排列
    val student_rdd3 = student_rdd2.repartitionAndSortWithinPartitions(new StudentPartitioner(10))
   //列印資料
    student_rdd3.collect.foreach(println)
  }
}