spark高階運算元(一)
阿新 • • 發佈:2018-11-05
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author zoujc
* @date 2018/11/1
*/
object SparkRDDTest1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkRDDTest1").setMaster("local[2]")
val sc = new SparkContext(conf)
//指定為兩個分割槽
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7), 2)
//設定一個函式,設定分割槽的ID索引,數值
val func1 = (index: Int, iter: Iterator[Int]) => {
iter.toList.map(x => s"[partID: $index,val: $x]").iterator
}
//檢視每個分割槽的資訊
val res1 = rdd1.mapPartitionsWithIndex(func1)
// println(res1.collect().toBuffer)
//用aggregate,指定初始值,對rdd1進行聚合操作,先區域性求和,在進行全域性求和
val res2 = rdd1.aggregate(0)(_ + _, _ + _)
// println(res2)
//將每個分割槽中最大的找出來求和
val res3 = rdd1.aggregate(0)(math.max(_, _),(_ + _))
//每個分割槽都以10為初始值,10用了3次
val res4 = rdd1.aggregate(10)(_ + _, _ + _)
//在List中有多少元素比e大和有多少元素比e小
val rdd2 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j"))
val (biggerthane, lessthane) = rdd2.aggregate((0, 0))(
(ee, str) => {
var biggere = ee._1
var lesse = ee._2
if (str.compareTo("e") >= 0) biggere = ee._1 + 1
else if (str.compareTo("e") < 0) lesse = ee._2 + 1
(biggere, lesse)
},
(x, y) => (x._1 + y._1, x._2 + y._2)
)
// println((biggerthane,lessthane))
//aggregate與aggregateByKey區別:前者針對序列操作,後者針對k,v對操作
//原型
// def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V)) =>
// combOp: (U,U): RDD[(K,U)] = self.withScope{
// aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
// }
//combineByKey
// def combineByKey[C](
// createCombiner: V => C,
// mergeValue: (C, V) => C,
// mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
// combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
// }
//從上面這段原始碼可以清晰看出,aggregateByKey呼叫的就是combineByKey方法。
// seqOp方法就是mergeValue,combOp方法則是mergeCombiners,cleanedSeqOp(createZero(), v)是createCombiner,
// 也就是傳入的seqOp函式, 只不過其中一個值是傳入的zeroValue而已!
//因此, 當createCombiner和mergeValue函式的操作相同, aggregateByKey更為合適!
val rdd3 = sc.parallelize(List("a","b","c","d","e","f"),2)
val res5 = rdd3.aggregate("|")(_ + _, _ + _)
// println(res5)
val rdd4 = sc.parallelize(List("12","23","345","4567"),2)
//兩個分割槽,計算出字串最大長度,然後合成字串
val res6 = rdd4.aggregate("")((x,y) => math.max(x.length,y.length).toString, (x,y) => x + y)
// println(res6) 24
val rdd5 = sc.parallelize(List("12", "23", "345", ""), 2)
val res7 = rdd4.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y)
// println(res7) 11
val rdd6 = sc.parallelize(List("12", "23", "", "345"), 2)
val res8 = rdd6.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y)
// println(res8) 11
//aggregateByKey可以先進行區域性操作,再進行全域性操作。
val pariRDD = sc.parallelize(List(("cat",2), ("cat",5), ("mouse", 4), ("cat", 12), ("dog", 12), ("mouse", 2)),2)
def func2(index: Int, iter: Iterator[(String,Int)]): Iterator[String] ={
iter.toList.map(x => s"[PartID: $index, val: $x]").iterator
}
println(pariRDD.mapPartitionsWithIndex(func2).collect().toBuffer)
//把每種型別最大的次數取出來
val res9 = pariRDD.aggregateByKey(0)(math.max(_, _),_ + _)
// println(res9.collect().toBuffer)
// ArrayBuffer((dog,12), (cat,17), (mouse,6))
//不為10的變成10
val res10 = pariRDD.aggregateByKey(10)(math.max(_, _),_ + _)
// println(res10.collect().toBuffer)
// ArrayBuffer((dog,12), (cat,22), (mouse,20))
/**
* pairRDD.aggregateByKey(0)(_ + _ , _ + _).collect與pairRDD.reduceByKey( _ + _).collect,
* 這兩個方法執行結果是一樣的,實際上底層都是呼叫的同一個方法:combineByKey
*/
}
}