我是60歲程式設計師
阿新 • • 發佈:2018-12-18
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author zoujc
* @date 2018/10/31
*/
object LearnRDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkRDDTest").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
//通過並行化,生成RDD
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
//對rdd1裡面的每個元素*2後排序,true表示正序,false表示倒序
val res1 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於10的元素
val res2 = res1.filter(_ >= 10)
val rdd2 = sc.parallelize(Array("a,b,c","d,e,f","g,h,i"))
//對rdd2裡的每個元素先切分再壓平
val res3 = rdd2.flatMap(_.split(","))
val rdd3 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i g","a a b")))
//將rdd3的每個元素先切分在壓平
val res4 = rdd3.flatMap(_.flatMap(_.split(" ")))
val rdd4 = sc.parallelize(List(5,6,4,3))
val rdd5 = sc.parallelize (List(1,2,3,4))
//求並集
val res5 = rdd4.union(rdd5)
//求交集
val res6 = rdd4.intersection(rdd5)
//去重
val res7 = rdd4.union(rdd5).distinct()
val rdd6 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
val rdd7 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
//求join
val res8 = rdd6.join(rdd7)
//求左連線、右連線
val res9 = rdd6.leftOuterJoin(rdd7)
val res10 = rdd6.rightOuterJoin(rdd7)
//求並集,可以不用.union,也很強大
val res11 = rdd6 union rdd7
//按key分組
val res12 = res11.groupByKey()
//分別用groupByKey和reduceByKey實現單詞計數,注意groupByKey與reduceByKey的區別
//groupByKey
val res13 = res11.groupByKey().mapValues(_.sum)
/**
* groupByKey:groupByKey會對每一個RDD中的value值進行聚合形成一個序列(Iterator)
* 此操作發生在reduce端,所以勢必所有的資料將會通過網路傳輸,造成不必要的浪費
* 同時如果資料量十分大,可能還會造成OutOfMemoryError
*/
//reduceByKey,先進行區域性聚合,在進行全域性聚合
val res14 = res11.reduceByKey(_ + _)
/**
* reduceByKey:reduceByKey會在結果傳送至reduce之前會對每個mapper在本地進行merge,
* 有點類似於MapReduce中的combiner。這樣做的好處是在map端進行一次reduce之後,
* 資料量會大幅度減小,從而減小傳輸,保證reduce端能夠更快的進行結果計算。
*/
val rdd8 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd9 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup ,注意cgroup與groupByKey區別
val res15 = rdd8.cogroup(rdd9)
/**
* cogroup是將輸入的資料集(k,v)和另外的資料集(k,w)進行cogroup,
* 得到的資料集是(k,Seq(v),Seq(w))的資料集
*/
val rdd10 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合 reduce是action類運算元
val res16 = rdd10.reduce(_ + _)
val rdd11 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd12 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd13 = rdd11.union(rdd12)
//按key進行聚合
val res17 = rdd13.reduceByKey(_ + _)
//按value的降序排序
val res18 = rdd13.reduceByKey(_ + _).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
/**
* 笛卡爾積: 笛卡爾乘積是指在數學中,兩個集合X和Y的笛卡尓積(Cartesian product),又稱直積,
* 表示為X×Y,第一個物件是X的成員而第二個物件是Y的所有可能有序對的其中一個成員[3] 。
* 假設集合A={a, b},集合B={0, 1, 2},則兩個集合的笛卡爾積為{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。
*/
val res19 = rdd11.cartesian(rdd12)
//要通過action型別的運算元才能顯示出結果,將結果放到可變陣列中,就可以看到輸出結果,
//如果不加toBuffer,則打印出來的是一個引用。
// println(res1.collect().toBuffer)
// println(res2.collect().toBuffer)
// ArrayBuffer(10, 12, 14, 16, 18, 20)
// println(res3.collect().toBuffer)
// println(res4.collect().toBuffer)
// println(res5.collect().toBuffer)
// println(res6.collect().toBuffer)
// println(res7.collect().toBuffer)
// ArrayBuffer(4, 1, 5, 6, 2, 3)
// println(res8.collect().toBuffer)
// println(res9.collect().toBuffer)
// println(res10.collect().toBuffer)
// res8.foreach(println)
// res9.foreach(println)
// res10.foreach(println)
// println(res11.collect().toBuffer)
// println(res12.collect().toBuffer)
// println(res13.collect().toBuffer)
// println(res14.collect().toBuffer)
// println(res15.collect().toBuffer)
// println(res16)
// println(res17.collect().toBuffer)
// println(res18.collect().toBuffer)
// println(res19.collect().toBuffer)
}
}