Spark Core 的RDD
(1)RDD的介紹
RDD(Resilient Distributed Dataset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變(RDD中的資料,不能增刪改),可分割槽、元素可平行計算的集合。
具有資料流的模型的特點,自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯示的將工作集快取在記憶體中。後續的查詢能夠重用工作集,這極大地提升了查詢速度。
RDD可以從 三方面理解:
- 資料集:RDD是資料集合的抽象,是複雜物理介質上存在資料的一種邏輯檢視。從外部看RDD的確可以被看待成經過封裝,帶擴充套件特性(如容錯性)的資料集合。
- 分散式
- 彈性:雖然 RDD 內部儲存的資料是隻讀的,但是,我們可以去修改(例如通 過 repartition 轉換操作)平行計算計算單元的劃分結構,也就是分割槽的數量。
總之:RDD就是一個大集合,將所有的資料都載入到記憶體中,方便多次進行重用。它的資料可以在多個節點上,並且RDD可以儲存在記憶體中,當如果某個階段的RDD丟失,不需要重新計算,只需要提取上一次的RDD,在相應的計算即可。
(2)RDD的屬性
1)A list of partitions(一組分片,資料集的基本單位)
一個分割槽通常與一個任務向關聯,分割槽的個數決定了並行的粒度。分割槽的個數可以在建立RDD的時候指定,如果不指定,那麼預設的由節點的cores個數決定。最終每一個分割槽會被對映成為BlockManager 中的一個Block,而這個Block會被下一個task使用進行計算。
2)A function for computing each split(運算元)
每一個RDD都會實現compute,用於分割槽進行計算
3)A list of dependencies on other RDDs(RDD之間的依賴)
RDD 的每次轉換都會生成一個新的 RDD,所以 RDD 之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark 可以通過這個依賴關係重新計算丟失的分割槽資料, 而不是對 RDD 的所有分割槽進行重新計算。
寬依賴和窄依賴:
窄依賴(完全依賴):一個父分割槽唯一對應一個子分割槽,例:map操作
寬依賴(部分依賴):一個父分割槽對應多個子分割槽,如:reduce、group操作
區分寬依賴和窄依賴:當前這個運算元的執行過程中是否有shuffle操作。
4)Optionally a Partitioner for key-value RDDs(分割槽函式)
當前 Spark 中實現了兩種型別的分片函式,一個是基於雜湊的 HashPartitioner,另外一個是基於範圍的 RangePartitioner。只有對於 key-value 的 RDD,才會有 Partitioner,非 key-value的 RDD 的 Parititioner 的值是 None。Partitioner 函式不但決定了 RDD 本身的分片數量,也決 定了 parent RDD Shuffle 輸出時的分片數量。
5)Optionally a list of preferred locations to compute each split on
一個列表,儲存存取每個 Partition 的優先位置(preferred location)。按照”移動資料不如移動計算”的理念,Spark 在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。而這個列表中就存放著每個分割槽的優先位置。
(3)RDD的API(相關運算元)
RDD程式設計中有兩種中形式:Transformation(轉換)和Action(行動)。
Transformation:表示把一個RDD ---->RDD。
Action:表示把RDD----集合或者scala物件。
1)RDD的建立:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext()
//由一個已經存在的 Scala 資料集合建立
val arr=Array(1,2,3,4)
val arr1RDD: RDD[Int] = sc.parallelize(arr)
val arr2RDD: RDD[Int] = sc.makeRDD(arr)
//由外部儲存系統的資料建立(HDFS、HBase...)
val HDFSRDD: RDD[String] = sc.textFile("/data/input")
}
}
2)Transformation:
官網:http://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
注意:RDD中所有的轉換(Transformation)都是延遲載入,也就是說,他們並不是直接計算結果,相反的,他們只是記住這些應用到基礎資料集,上的一個轉換動作,只有當發生一個要求返回一個Driver動作的時候,這些轉換才真正執行。
map()運算元:
val HDFSRDD: RDD[String] = sc.textFile("/data/input")
/**
* map 運算元,返回一個新的RDD,該RDD由每一個輸入元素經過function函式轉換後組成
*/
val mapRDD: RDD[(String, Int)] = HDFSRDD.map(ele=>(ele,1))
flatMap()運算元:
val arr=Array("hive hbase hadoop","spark hadoop","yarn hdfs")
val lineRDD: RDD[String] = sc.parallelize(arr)
/**
* flagMap:類似於map,但是每一個元素輸入的元素可以被
* 對映成為0個或者多個輸出的元素(返回的是一個序列,而不是單一的元素)
*/
//返回一個集合hive hbase hadoop spark hadoop yarn hdfs
val wordRDD: RDD[String] = lineRDD.flatMap(line=>line.split("\\s+"))
filter()運算元:
val arr=Array(1,2,3,4,5)
val arrRDD: RDD[Int] = sc.parallelize(arr)
/**
* filter過濾:返回一個新的RDD,該RDD由經過func函式計算後返回
* 值為true的輸入元素組成
*/
val filterRDD: RDD[Int] = arrRDD.filter(num=>num%2==0)
mapPartitions()運算元:
val hdfsRDD: RDD[String] = sc.textFile("/data/input")
/**
* mapPartitions與map的唯一區別就是,mapPartitions迭代的是一個分割槽,
* 而map遍歷的每一個元素,mapPartitions引數是一個迭代物件,返回的也是一個迭代物件
*/
val partitionRDD: RDD[String] = hdfsRDD.mapPartitions((x: Iterator[String]) => {
val temp = x.toList.map(line => line + "!")
temp.toIterator
})
mapPartitionsWithIndex()運算元:
val hdfsRDD: RDD[String] = sc.textFile("/data/input")
/**
* 第一個引數是分割槽編號:分割槽編號是從0開始的不間斷的連續編號
* 第二個引數和mapPartitions相同
*/
val partitionRDD: RDD[String] = hdfsRDD.mapPartitionsWithIndex((parnum:Int,x: Iterator[String]) => {
println(parnum) //分割槽編號
val temp = x.toList.map(line => line + "!")
temp.toIterator
})
sample()運算元:
val list=1 to 5000
/**
* sample方法有三個引數:
* withReplacement:代表是否有放回的抽取(false 不放回,true:放回)
* fraction:抽取樣本空間佔總體的比例,(以分數的形式) 0<=fraction <=1
* seed:隨機數生成器,new Random().nextInt(10),不傳表示使用系統的
* 注意:我們使用的sample運算元,不能保證提供集合大小就恰巧是rdd.size()*fraction,結果大小會上下浮動
* sample在做抽樣調查的時候,特別受用
*/
val listRDD: RDD[Int] = sc.parallelize(list)
val sampleRDD: RDD[Int] = listRDD.sample(false,0.2)
println(sampleRDD.count()) //大概是5000*0.2 上下浮動
groupByKey()運算元:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* groupByKey,分組
* 建議groupByKey在實踐中,能不用就不用,主要因為groupByKey的效率低,
* 因為有大量的資料在網路中傳輸,而且還沒有進行本地的預處理
* 可以使用reduceByKey或者aggregateByKey或者combineByKey代替這個groupByKey
*/
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
//分組
val groupRDD: RDD[(String, Iterable[Int])] = stuRDD.groupByKey()
//求平均值
val result: RDD[(String, Double)] = groupRDD.map { case (name, score) => {
val avg = score.sum.toDouble / (score.size)
(name, avg)
}
}
reduceByKey運算元:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* reduceByKey:在一個(K,V)對的資料集上使用,返回一個(K,V)對的資料
* 集,key 相同的值,都被使用指定的 reduce 函式聚合
* 到一起。和 groupByKey 類似,任務的個數是可以通過
* 第二個可選引數來配置的。
*/
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
//分組,求總分
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //列印:(hbase,36)(math,18)(hbase,18)
sortByKey()運算元:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* sortByKey:在一個(K,V)的 RDD 上呼叫,K 必須實現 Ordered 介面,
* 返回一個按照 key 進行排序的(K,V)的 RDD
*/
//分組,求總分,排序
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //列印:(hbase,36)(math,18)(hbase,18)
val sortRDD: RDD[(String, Int)] = sumRDD.map(kv=>(kv._2,kv._1)).sortByKey().map(kv=>(kv._2,kv._1))
sortRDD.foreach(println)
sortBy運算元:
val list=List(("math",18),("hbase",18),("hive",22),("hive",18))
/**
* sortBy(func,[ascending], [numTasks])
* 與 sortByKey 類似,但是更靈活
* 第一個引數是根據什麼排序
* 第二個是怎麼排序,true 正序,false 倒序
* 第三個排序後分區數,預設與原 RDD 一樣
*/
//分組,求總分,排序
val stuRDD: RDD[(String, Int)] = sc.parallelize(list)
val sumRDD: RDD[(String, Int)] = stuRDD.reduceByKey((x, y)=>x+y)
sumRDD.foreach(println) //列印:(hbase,36)(math,18)(hbase,18)
val sortRDD: RDD[(String, Int)] = sumRDD.sortBy(kv=>kv._2,false,2)
aggregateByKey()運算元:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext()
/**
* aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])
* 先按分割槽聚合再總的聚合,每次要跟初始值交流
* zeroValue:初始值
* seqOp:迭代操作,拿RDD中的每一個元素跟初始值進行合併
* combOp:分割槽結果的最終合併
* numTasks:分割槽個數
* aggregate+groupByKey=aggregateByKey
* aggregate對單個值進行RDD,aggregateByKey對(K,V)值進行RDD
*/
//aggregate
val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val listRDD: RDD[Int] = sc.parallelize(list)
//求平均值
/**
* seqOp: (U, T) => U
* combOp: (U, U) => U
* u:(Int,Int) 總和,總次數
*/
val result: (Int, Int) = listRDD.aggregate(0, 0)((u: (Int, Int), x: Int) => {
(u._1 + x, u._2 + 1)
}
, (u1: (Int, Int), u2: (Int, Int)) => {
(u1._1 + u2._1, u1._2 + u2._2)
})
println(result._1 / result._2)
//aggregateByKey已經根據(k,v)k 進行分組,以下的操作,是對v進行操作
//以下操作時求平均值
val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val stuRDD: RDD[(String, Int)] = sc.parallelize(list1)
val reslutRDD2: RDD[(String, (Int, Int))] = stuRDD.aggregateByKey((0, 0))((x: (Int, Int), y: Int) => {
(x._1 + y, x._2 + 1)
}, (x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
})
reslutRDD2.foreach(kv=>{
val name=kv._1
val avg=kv._2._1.toDouble/kv._2._2
})
}
}
foldLeft()運算元:(不是spark的運算元,是scala的高階操作)
/**
* foldLeft
* (zeroValue: T) 初值值
* (B, A) => B B是一個元組,B._1 表示累加元素,B._2 表示個數, A 表示下一個元素
*/
//aggregate
val list = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val result: (Int, Int) = list.foldLeft((0,0))((x, y)=>{(x._1+y,x._2+1)})
println(result._1.toDouble/result._2)
combineByKey()運算元:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
/**
* combineByKey:
* 合併相同的 key 的值 rdd1.combineByKey(x => x, (a: Int,
* b: Int) => a + b, (m: Int, n: Int) => m + n)
*/
//求平均值
val list1 = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list1)
/**
* createCombiner: V => C,
* mergeValue: (C, V) => C,
* mergeCombiners: (C, C) => C): RDD[(K, C)]
*/
val resultRDD: RDD[(String, (Int, Int))] = listRDD.combineByKey(x => {
(x, 1)
},
(x: (Int, Int), y: Int) => {
(x._1 + y, x._2 + 1)
},
(x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
})
resultRDD.foreach{case (name,(sum,count))=>{
val avg=sum.toDouble/count
println(s"${name}:${avg}")
}}
}
}
連線操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val arr1 = Array(1, 2, 4, 5)
val arr1RDD = sc.parallelize(arr1)
val arr2 = Array(4, 5, 6, 7)
val arr2RDD = sc.parallelize(arr2)
//cartesian 笛卡爾積
val cartesianRDD: RDD[(Int, Int)] = arr1RDD.cartesian(arr2RDD)
//union : 連線
val unionRDD: RDD[Int] = arr1RDD.union(arr2RDD)
//subtract,求,差集
val sbutractRDD: RDD[Int] = arr1RDD.subtract(arr2RDD)
//join
val list1 = List(("a", 1), ("b", 2), ("c", 3))
val list1RDD = sc.parallelize(list1)
val list2 = List(("a", "zs"), ("b", "sl"))
val list2RDD = sc.parallelize(list2)
/**
* 根據元組中的key進行join 操作,相同的key向連線
* 返回的是RDD[(String, (Int, String))] (key,連線結果)
*/
val joinRDD: RDD[(String, (Int, String))] = list1RDD.join(list2RDD)
//cogroup
/**
* (String key ,
* (Iterable[Int] arr1中的相應的key所有value的集合
* , Iterable[String])) arr2中的相應的key所有value的集合
*/
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[String]))] = list1RDD.cogroup(list2RDD)
}
}
分割槽操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
/**
* 表示在執行了filter操作之後,由於大量的資料被過濾,導致之前設定的分割槽task個數,
* 處理剩下的資料導致資源浪費,為了合理高效的利用資源,
* 可以對task重新定義,在coalesce方法中的分割槽個數一定要小於之前設定的分割槽個數。
*/
hdfsRDD.coalesce(2)
//打亂資料,重新分割槽,分割槽規則為隨機分割槽
hdfsRDD.repartition(3)
//自定義分割槽規則(注意,只在有key-value的RDD中可以使用)
var arr1 = Array(("a", 1), ("a", 2), ("c", 1), ("b", 2), ("d", 2)
("b", 2), ("e", 2)
, ("b", 2)
, ("f", 2), ("g", 2), ("h", 2))
val arrRDD: RDD[(String, Int)] = sc.parallelize(arr1,4)
arrRDD.partitionBy(new MyPartitioner(3))
}
}
class MyPartitioner(val numPTN:Int) extends Partitioner{
//分割槽個數
override def numPartitions: Int = numPTN
//分割槽規則
override def getPartition(key: Any): Int = {
val num=key.hashCode()&Integer.MAX_VALUE%numPTN
return num
}
}
總結:
- Transformation返回的仍然是一個RDD
- 它使用了鏈式呼叫的設計模式,對一個 RDD 進行計 算後,變換成另外一個 RDD,然後這個 RDD 又可以進行另外一次轉換。這個過程是分散式的。
3)Action:
常見操作:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list)
//action rdd ---map
listRDD.reduceByKeyLocally((x,y)=>x+y)
//呼叫collect的目的是:觸發所有的計算,最終收集當前這個呼叫者RDD的所有資料,返回到客戶端,如果資料量比較大,謹慎使用
listRDD.collect()
//統計RDD中有多少記錄
listRDD.count()
//取出RDD中的第一條記錄
listRDD.first()
//取出RDD前幾條記錄
listRDD.take(5)
//隨機取樣
listRDD.takeSample(false,20)
//按照某種格式,排序後的前幾條
listRDD.top(50)
//按照升序或者降序,取相應的條數的記錄(其中的元素必須繼承Ordered)
listRDD.takeOrdered(3)
//統計每一個key中的value有多少個
listRDD.countByKey()
//統計有多少個元素
listRDD.countByValue()
//遍歷RDD中每一個元素
listRDD.foreach(kv=>{})
//分割槽遍歷RDD中的元素
listRDD.foreachPartition(kv=>{})
//將RDD的結果,儲存到相應的檔案系統中(注意這個目錄一定是不存在的目錄)
listRDD.saveAsTextFile("/data/output")
}
}
總結:Action返回值不是一個RDD。它要麼是一個scala的集合,要麼是一個值,要麼是空。最終返回到Driver程式,或者把RDD寫入到檔案系統中。