1. 程式人生 > >spark的運算元Tranformation和Action的使用demo

spark的運算元Tranformation和Action的使用demo

  • 在spark中有兩種運算元:Tranformation和Action

Tranformation: 中文為轉換,他會延遲載入,當你執行了Tranformation的運算元,spark並不會立即進行計算,而是會記錄計算的元資料,比如你執行如下操作:

sc.textFile("hdfs://cdhnode1:8030/wordcount")

spark並不會把資料讀入到RDD中,而是記錄在那個地方讀取,當遇到Action運算元時才會去真正的執行。

Action:中文為行動,顧名思義他和Tranformation不一樣,Action運算元會立即執行

  • 什麼是RDD

RDD是Resilient Distributed Datasets的縮寫及彈性分散式資料集,在spark的原始碼RDD類中,作者寫了一段註釋:

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
 * [[org.apache.spark.rdd.PairRDDFunctions]]
contains operations available only on RDDs of key-value * pairs, such as `groupByKey` and `join`; * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of * Doubles; and * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that * can be saved as SequenceFiles. * All operations are automatically available on any RDD of the right type
(e.g. RDD[(Int, Int)] * through implicit. * * Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) * * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for * reading data from a new storage system) by overriding these functions. Please refer to the * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details * on RDD internals. */
  • 其中中間說RDD有5個特點:
 * Internally, each RDD is characterized by five main properties:
 *
 *  - A list of partitions 
 *  //有一組的patition

 *  - A function for computing each split
 *  //由一個函式計算每一個分片

 *  - A list of dependencies on other RDDs
 *  //具有依賴性
 * 
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  //key-value型的RDD是根據雜湊來分割槽的

 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)
 *  //資料本地性計算
  • RDD的建立方式:

1.通過spark支援的資料來源建立RDD,例如使用hdfs建立rdd

sc.textFile("hdfs://cdhnode1:8030/wordcount")

2.通過集合或者陣列以並行化的方式建立

sc.parallelize(Array(1,2,3,4,5,6,7,8))
  • 下面是Tranformation和Action的demo例子

1.啟動spark

這裡寫圖片描述

2.開啟spark shell

/home/hadoop/app/spark-1.3.1-bin-hadoop2.6/bin/spark-shell --master spark://cdhnode1:7077 --executor-memory 512m --total-executor-cores 2

一 .Tranformation demo

map

//建立rdd
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:21

//map
scala> rdd1.map(_*10)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:24

//檢視結果(collect屬於Action運算元)
scala> res0.collect
res1: Array[Int] = Array(10, 20, 30, 40, 50)

sortBy

//建立rdd
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:21

//sortBy
scala> rdd1.sortBy(x=>x,true)
res2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at sortBy at <console>:24

//檢視結果(collect屬於Action運算元)
scala> res2.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5)

flatMap

//建立rdd
scala> val rdd1 = sc.parallelize(Array("hello world", "hello word count", "hello lijie"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:21

//flatMap
scala> rdd1.flatMap(_.split(" ")).collect

//結果
res4: Array[String] = Array(hello, world, hello, word, count, hello, lijie)

//巢狀Array可以用兩次flatMap

//建立rdd
scala> val rdd1 = sc.parallelize(Array(Array("hello world", "hello word count", "hello lijie")))
rdd1: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[13] at parallelize at <console>:21

//flatMap
scala> rdd1.flatMap(_.flatMap(_.split(" "))).collect

//結果
res5: Array[String] = Array(hello, world, hello, word, count, hello, lijie)

union

scala> val rdd1 = sc.parallelize(List(5,6,4,7))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:21

scala> val rdd1 = sc.parallelize(List(1,2,3,0))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(5,3,2,1))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:21

scala> val ddr3 = rdd1.union(rdd2)
ddr3: org.apache.spark.rdd.RDD[Int] = UnionRDD[18] at union at <console>:25

scala> ddr3.collect
res6: Array[Int] = Array(1, 2, 3, 0, 5, 3, 2, 1)

intersection

scala> val ddr3 = rdd1.intersection(rdd2)
ddr3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at intersection at <console>:25

scala> ddr3.collect
res7: Array[Int] = Array(2, 1, 3)

join

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[25] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> rdd1.join(rdd2).collect
res8: Array[(String, (Int, Int))] = Array((zhangsan,(28,100)), (lijie,(24,99)))

leftOuterJoin

scala> rdd1.leftOuterJoin(rdd2).collect
res9: Array[(String, (Int, Option[Int]))] = Array((zhangsan,(28,Some(100))), (lijie,(24,Some(99))), (lisi,(39,None)))

rightOuterJoin

scala> rdd1.rightOuterJoin(rdd2).collect
res10: Array[(String, (Option[Int], Int))] = Array((zhangsan,(Some(28),100)), (wangwu,(None,88)), (lijie,(Some(24),99)))

groupByKey

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[36] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[37] at parallelize at <console>:21

scala> rdd1.union(rdd2).groupByKey.collect
res11: Array[(String, Iterable[Int])] = Array((zhangsan,CompactBuffer(28, 100)), (wangwu,CompactBuffer(88)), (lijie,CompactBuffer(24, 99)), (lisi,CompactBuffer(39)))

//使用它計算wordcount
scala> rdd1.union(rdd2).groupByKey.map(x=>(x._1,x._2.sum)).collect
res12: Array[(String, Int)] = Array((zhangsan,128), (wangwu,88), (lijie,123), (lisi,39))

reduceByKey

scala> rdd1.union(rdd2).reduceByKey(_+_).collect

res13: Array[(String, Int)] = Array((zhangsan,128), (wangwu,88), (lijie,123), (lisi,39))

cogroup

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lijie", 999),("haha", 123)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[45] at parallelize at <console>:21

scala> val  rdd2 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:21

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[48] at cogroup at <console>:25

scala> rdd3.collect
res14: ArrayBuffer((zhangsan,(CompactBuffer(28),CompactBuffer(28))), (lijie,(CompactBuffer(24, 999),CompactBuffer(24))), (haha,(CompactBuffer(123),CompactBuffer())), (lisi,(CompactBuffer(),CompactBuffer(39))))

//利用它計算worldcount
scala> rdd1.cogroup(rdd2).groupByKey.mapValues(_.map(x => {x._1.sum + x._2.sum})).collect()

//或者
scala> rdd1.cogroup(rdd2).map(x =>{(x._1,x._2._1.sum+x._2._2.sum)}).collect

res16: Array[(String, Int)] = ArrayBuffer((zhangsan,List(56)), (lijie,List(1047)), (haha,List(123)), (lisi,List(39)))

cartesian

scala> val rdd2 = sc.parallelize(List(("lijie", 99), ("wangwu", 88), ("zhangsan", 100)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:21

scala> val rdd1 = sc.parallelize(List(("lijie", 24), ("zhangsan", 28), ("lisi", 39)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[53] at parallelize at <console>:21

scala> rdd1.cartesian(rdd2).collect
res17: Array[((String, Int), (String, Int))] = Array(((lijie,24),(lijie,99)), ((lijie,24),(wangwu,88)), ((lijie,24),(zhangsan,100)), ((zhangsan,28),(lijie,99)), ((lisi,39),(lijie,99)), ((zhangsan,28),(wangwu,88)), ((zhangsan,28),(zhangsan,100)), ((lisi,39),(wangwu,88)), ((lisi,39),(zhangsan,100)))

mapPartitionsWithIndex

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

//他需要傳入一個函式,返回一個Iterator
scala> val func = (index: Int, iter: Iterator[(Int)]) => {
     |   iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
     | }
func: (Int, Iterator[Int]) => Iterator[String] = <function2>

scala> rdd1.mapPartitionsWithIndex(func)
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:26

scala> res1.collect
res2: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:0, val: 4], [partID:1, val: 5], [partID:1, val: 6], [partID:1, val: 7], [partID:1, val: 8], [partID:1, val: 9])

#計算每個分割槽處理的數值之和
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val func = (a: Int, b: Iterator[(Int)]) => {
  b.toList.map(x => {
    (a,x)
  }).toIterator
}
println(rdd1.mapPartitionsWithIndex(func).reduceByKey(_+_).collect().toBuffer)

#結果
ArrayBuffer((0,6), (1,15), (2,24))

aggregateByKey

//1. 求每種動物的總數
scala> val rdd1 = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at parallelize at <console>:21

scala> rdd1.aggregateByKey(0)(_ + _, _ + _).collect

res11: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

//2. 求每個分割槽動物最大的,然後再將不同分割槽的加起來
scala> rdd1.aggregateByKey(0)(math.max(_, _), _ + _).collect

res12: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

//3.其他測試
scala> rdd1.aggregateByKey(100)(math.max(_, _), _ + _).collect

res13: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

combineByKey


//1.
scala> val rdd1 = sc.parallelize(List("hello world","hello lijie","lijie test"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:21

//x => x表示初始值,(a: Int, b: Int) => a + b表示區域性相加,(m: Int, n: Int) => m + n表示區域性加完之後再相加
scala> rdd1.flatMap(_.split(" ")).map((_,1)).combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n).collect

res15: Array[(String, Int)] = Array((hello,2), (world,1), (test,1), (lijie,2))


//2.其他  數字相同的 放一個集合中
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(1,2,4,2),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:21

scala> val rdd3 = rdd2.zip(rdd1)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[23] at zip at <console>:25


scala> rdd3.collect
res19: Array[(Int, String)] = Array((1,a), (2,b), (4,c), (2,d))

scala> rdd3.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ::: n)
res17: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[21] at combineByKey at <console>:28

scala> rdd3.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ::: n)
res20: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[24] at combineByKey at <console>:28

scala> res20.collect
res21: Array[(Int, List[String])] = Array((4,List(c)), (2,List(b, d)), (1,List(a)))

repartition(num)和coalesce(num,true)一樣

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val rdd2 = rdd1.repartition(5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:23

flatMapValues

scala> val rdd1 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[17] at parallelize at <console>:21

scala> val rdd2 = rdd1.flatMapValues(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[18] at flatMapValues at <console>:23

scala> rdd2.collect

res6: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

foldByKey

scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:21

scala> val rdd2 = rdd1.map(x => (x.length, x)).foldByKey("")(_+_)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[21] at foldByKey at <console>:23

scala> rdd2.collect
res7: Array[(Int, String)] = Array((4,lisihehe), (5,lijiehello))

keyBy

scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at parallelize at <console>:21

//讓字串的長度作為key
scala> rdd1.keyBy(_.length)
res10: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at keyBy at <console>:24

scala> res10.collect
res11: Array[(Int, String)] = Array((5,lijie), (5,hello), (4,lisi), (4,hehe))

keys

scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> rdd1.map(x => (x.length,x))
res13: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[27] at map at <console>:24

scala> res13.keys.collect
res14: Array[Int] = Array(5, 5, 4, 4)

values


scala> val rdd1 = sc.parallelize(List("lijie", "hello", "lisi", "hehe"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> rdd1.map(x => (x.length,x))
res13: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[27] at map at <console>:24

scala> res13.values.collect
res15: Array[String] = Array(lijie, hello, lisi, hehe)

二 .Tranformation demo

collect

sc.textFile("hdfs://cdhnode2:8030/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect

res26: Array[(String, Int)] = Array(("",18), (the,8), (and,6), (of,5), (The,4), (cryptographic,3), (encryption,3), (for,3), (this,3), (Software,2), (on,2), (which,2), (software,2), (at:,2), (includes,2), (import,,2), (use,,2), (or,2), (software.,2), (software,,2), (more,2), (to,2), (distribution,2), (using,2), (re-export,2), (information,2), (possession,,2), (our,2), (please,2), (Export,2), (under,1), (country,1), (is,1), (Technology,1), (Jetty,1), (currently,1), (check,1), (permitted.,1), (have,1), (Security,1), (U.S.,1), (with,1), (BIS,1), (This,1), (mortbay.org.,1), ((ECCN),1), (security,1), (Department,1), (export,1), (reside,1), (any,1), (algorithms.,1), (from,1), (details,1), (has,1), (SSL,1), (Industry,1), (Administration,1), (provides,1), (http://hadoop.apache.org/core/,1), (cou...

collectAsMap

scala> val rdd = sc.parallelize(List(("a", 1), ("b", 2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[5] at parallelize at <console>:21

scala> rdd.collectAsMap

res0: scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)
  • saveAsTextFile
sc.textFile("hdfs://cdhnode2:8030/wordcount").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://cdhnode2:8030/wordcount/myout")

reduce

這裡寫程式碼片

count

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[97] at parallelize at <console>:21

scala> rdd1.reduce(_+_)
res28: Int = 15

top

scala> rdd1.top(3)
res29: Array[Int] = Array(5, 4, 3)

take

scala> rdd1.take(3)
res30: Array[Int] = Array(1, 2, 3)

first

scala> rdd1.first
res31: Int = 1

takeOrdered

scala> rdd1.takeOrdered(3)
res33: Array[Int] = Array(1, 2, 3)

aggregate

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

//1. 使用它求和
//第一個引數是初始值, 二:是2個函式[每個函式都是2個引數(第一個引數:先對個個分割槽進行合併, 第二個:對個個分割槽合併後的結果再進行合併), 輸出一個引數
scala> rdd1.aggregate(0)(_+_, _+_)

//結果
res3: Int = 45

//2. 使用它求上面分割槽最大值的和
scala> rdd1.aggregate(0)(math.max(_, _), _ + _)

//結果
res4: Int = 13

//3. 其他01
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:21

scala> rdd2.aggregate("")(_+_,_+_)

res5: String = abcdef

scala> rdd2.aggregate("*")(_ + _, _ + _)

res7: String = **abc*def

//4. 其他02
scala> val rdd3 = sc.parallelize(List("a","bb","ccc","dddd"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:21

scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

res8: String = 24 或者