[Spark]學習筆記二——RDDs
一、一些物件
1.Driver Program:包含程式的main()方法,RDDs的定義和操作,它管理很多節點,我們稱之為executors
2.SparkContext:Driver Program通過SparkContext物件訪問Spark,SparkContext物件代表和一個叢集的連線
3.在shell中SparkContext物件自動建立好了,就是sc,可以在shell中直接使用sc
二、RDDs(Resilient distributed dataset——彈性分散式資料集)
1.RDDs的介紹:
1)並行的分佈在叢集中
2)RDDs是Spark分發資料和計算的基礎抽象類
3)一個RDD是不可改變
4)Spark中,所有的計算都是通過RDDs的建立、轉換等操作完成的
5)一個RDD內部由許多==partitions(分片)==組成
分片:
每個分片包括一部分資料,partitions可在叢集不同節點上計算
分片是Spark並行處理的單元,Spark會順序的、並行的處理分片
2.RDDs的建立方法:
1)把一個已存在的集合傳給SparkContext的parallelize()方法,可用來測試
val rdd=sc.parallelize(Array(1,2,2,4),4) ----第一個引數:待並行化處理的集合
2)載入外部資料集:可以載入本地檔案,也可以載入hadoop檔案
三、RDD基本操作之Transformation
1.Transformation:從之前的RDD構建一個新的RDD,像map()和filter()……
2.逐元素Transformation:
1)map()——接收函式,把函式應用到RDD的每一個元素,返回新的RDD
2)filter()——接受一個函式,返回只包含,滿足filter()函式的新RDD
3)flatMap()——對每個輸入元素,輸出多個輸出元素,flatMap()將RDD中的元素壓扁後返回一個新的RDD
3.集合運算:RDDs支援數學集合的運算,例如:並集、交集等
1)distinct()——去重方法
val c1=sc.parallelize(Array("coffe","coffe","tea","tea","coka")) c1.foreach(println) tea coffe tea coka coffe c1.distinct.foreach(println) coka tea coffe
2)union()——並集
val c2=sc.parallelize(Array("coffe","tea"))
c1.distinct.union(c2).foreach(println)
coffe
tea
coffe
coka
tea
3)intersection()——交集
c1.distinct.intersection(c2).foreach(println)
coffe
tea
4)subtract()
c1.distinct.subtract(c2).foreach(println)
coka
四、RDD基本操作之Action
1.Action:在RDD上計算出來一個結果,把結果返回給driver program或儲存在檔案系統,像count(),save……
2.collect():遍歷整個RDD,向driver program返回RDD的內容,可以用此檢視小資料;大資料的時候,使用savaAsTextFile()這一action
3.reduce():接收一個函式,作用在RDD兩個型別相同的元素上,返回新元素;可以實現:RDD中元素的累加,計數,和其他型別的聚集操作
4.take(n):返回RDD的n個元素,返回結果是無序的,一般是測試使用
5.top():排序
6.foreach():計算RDD中的每個元素,但不返回到本地,可以配合println友好的打印出資料
五、RDDs的特性
1.RDDs的血統關係圖:
Spark維護著RDDs之間的依賴關係和建立關係,叫做血統關係圖
Spark使用血統關係圖來計算每個RDD的需求和恢復丟失的資料
2.延遲計算:
Spark對RDDs的計算是,他們第一次使用action操作的時候
這種方式在處理大資料的時候特別有用,可以減少資料的傳輸
Spark內部記錄metadat,表明transformation操作已經被響應了
載入資料也是延遲計算,資料只有在必要的時候,才會被載入進去
3.RDD.persist():
預設每次在RDDs上面進行action操作時,Spark都重新計算RDDs
如果想重複利用一個RDD,可以使用RDD.persist()
unpersist()方法是從快取中移除
六、KeyValue對
1.建立KeyValue對RDDs:使用map()函式,返回KeyValue對
2.reduceByKey():把相同的key結合
val rdd1=sc.parallelize(Array((1,2),(3,4),(3,6)))
val rdd2=rdd1.reduceByKey(_+_)
rdd2.foreach(println)
(1,2)
(3,10)
3.groupByKey():把相同key的values分組
val rdd3=rdd1.groupByKey()
rdd3.foreach(println)
(1,CompactBuffer(2))
(3,CompactBuffer(4, 6))
4.mapValues():函式作用於pairRDD的每個元素,key不變
val rdd4=rdd1.mapValues(x=>x+1)
rdd4.foreach(println)
(1,3)
(3,5)
(3,7)
5.flatMapValues():符號化的時候使用
val rdd5=rdd1.flatMapValues(x=>x to 5)
rdd5.foreach(println)
(3,4)
(3,5)
(1,2)
(1,3)
(1,4)
(1,5)
6.keys:僅返回keys
val rdd6=rdd1.keys
rdd6.foreach(println)
1
3
3
7.values:僅返回values
8.sortByKey():按照key排序的RDD
9.combineByKey():!!!
引數(createCombiner,mergeValue,mergeCombiners,partitioner)
最常用的基於key的聚合函式,返回的型別可以與輸入型別不一樣,許多基於key的聚合函式都用到了它,像groupByKey()……
元素的key,要麼是之前見過的,要麼是新的
遍歷partition中的如果是新元素,使用我們提供的createCombiner()函式
如果是這個partition中已經存在的key,就會使用mergeValue()函式
合計每個partition的結果的時候,使用mergeCombiners()函式元素
//求平均成績
//初始化
val scores=sc.parrallelize(Array(("Nina",80),("Nina",90),("Nina",100),("Jack",100),("Jack",100),("Jack",100)))
scores.foreach(println)
(Jack,100)
(Jack,100)
(Jack,100)
(Nina,80)
(Nina,90)
(Nina,100)
//使用combineByKey()函式,求每個人的總科目數和總成績
val result=scores.combineByKey(score=>(1,score),(c1:(Int,Int),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))
result.foreach(println)
(Jack,(3,300))
(Nina,(3,270))
//求平均成績
val average=result.map{case(name,(num,score))=>(name,score/num)}
average.foreach(println)
(Nina,90)
(Jack,100)