Spark-Core應用詳解之基礎篇
阿新 • • 發佈:2018-12-31
一、RDD
1.什麼是RDD
RDD,是spark為了簡化使用者的使用,對所有的底層資料進行的抽象,以面向物件的方式提供了RDD的很多方法,通過這些方法來對RDD進行內部的計算額輸出。
RDD:彈性分散式資料集。
2.RDD的特性
1.不可變,對於所有的RDD操作都將產生一個新的RDD。
2.可分割槽,RDD是通過將資料進行分割槽儲存的。
3.彈性:
<1>儲存的彈性:記憶體與磁碟的自動切換。
<2>容錯的彈性:資料丟失可以自動恢復。
<3>計算的彈性:計算出錯進行重試機制。
<4>分片的彈性:根據需要重新進行分片。
3.Spark到底做了些什麼
4.RDD是懶執行的,分為轉換和行動操作,行動操作負責觸發RDD執行
二、RDD的方法
1.RDD的建立
<1>從集合中建立RDD
方法:使用makeRDD或者parallelize
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
}
def makeRDD[ T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
}
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
/**
*可以為單個數據物件存放的節點
*/
<2>從外部儲存建立RDD
<3>從其他RDD轉換
2.RDD的型別
<1>數值型RDD
RDD[Int],RDD[(Int),(Int)]
<2>鍵值對RDD
RDD[(Int),(Int)],RDD[(Int),(Int,Int)]
**所有鍵值對RDD都可以使用資料型RDD操作
3.RDD常用運算元
Transiformation
<1>map
/**
* Return a new RDD by applying a function to all elements of this RDD.
* 一對一的進行RDD的轉換操作,並且產生一個新的RDD儲存所有的elements
*/
def map[U: ClassTag](f: T => U): RDD[U]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
scala> res1.map(x=>x*2).collect
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
scala> sc.makeRDD("zhangzhangshisb").collect
res3: Array[Char] = Array(z, h, a, n, g, z, h, a, n, g, s, h, i, s, b)
<2>filter
/**
* Return a new RDD containing only the elements that satisfy a predicate.
* 過濾的RDD轉換操作
*/
def filter(f: T => Boolean): RDD[T]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
scala> res1.map(x=>x*2)
res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:27
scala> res7.filter(_>10)
res8: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:29
scala> res8.collect
res9: Array[Int] = Array(12, 14, 16, 18, 20)
<3>flatMap
/**
* 通過一個演算法將RDD多維化,但是輸出卻是平面的型別
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
scala> res1.flatMap(1 to _)
res10: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at flatMap at <console>:27
scala> res10.collect
res11: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
<4>mapPartitions
/**
* 將RDD進行分塊操作,使該RDD區域的所有元素執行此命令
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
scala> sc.makeRDD(1 to 10)
res1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:25
scala> res1.mapPartitions(x=>x.filter(_ % 2==0).map("Partition"+_))
res12: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at mapPartitions at <console>:27
scala> res12.collect
res13: Array[String] = Array(Partition2, Partition4, Partition6, Partition8, Partition10)
<5>mapPartitionsWithIndex
/**
*在mapPartitions基礎上增加了一個index的索引引數
*在建立RDD的時候也可以手動設定Partitions的數量
*看如下操作
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
scala> var rdd = sc.makeRDD(1 to 50,5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
scala> rdd.collect
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50)
scala> rdd.partitions.size
res16: Int = 5
scala> rdd.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at mapPartitionsWithIndex at <console>:27
scala> res17.collect
res18: Array[String] = Array(0:[1,2,3,4,5,6,7,8,9,10], 1:[11,12,13,14,15,16,17,18,19,20], 2:[21,22,23,24,25,26,27,28,29,30], 3:[31,32,33,34,35,36,37,38,39,40], 4:[41,42,43,44,45,46,47,48,49,50])
<6>sample
/**
* 簡單來說就是隨機抽樣操作
* withReplacement:
* true就是放回抽樣,
* false就是不放回抽樣
* fracition:
* 挑選出來元素的比率
* seed:
* 不用多說了吧,種子演算法
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
scala> var rdd = sc.makeRDD(1 to 50,5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:24
scala> rdd.sample(true,0.5,5).collect
res19: Array[Int] = Array(8, 8, 8, 9, 14, 14, 16, 16, 20, 21, 22, 22, 25, 27, 28, 28, 32, 33, 36, 36, 45, 45, 48, 48, 49, 49)
<7>union
/**
* 聯合一個RDD,返回一個組合的RDD,但是兩個RDD的型別得一樣
*/
def union(other: RDD[T]): RDD[T]
scala> sc.makeRDD(5 to 9).union(sc.makeRDD(10 to 15)).collect
res21: Array[Int] = Array(5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
<8>intersection
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
* elements, even if the input RDDs did.
*
* @note This method performs a shuffle internally.
* 返回兩個RDD的交集
*/
def intersection(other: RDD[T]): RDD[T]
scala> sc.makeRDD(5 to 9).intersection(sc.makeRDD(0 to 15)).collect
res22: Array[Int] = Array(6, 8, 7, 9, 5)
<9>distinct
/**
* Return a new RDD containing the distinct elements in this RDD.
* 去重但混洗,有點像shuffle的那種?
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
scala> sc.makeRDD(5 to 9).union(sc.makeRDD(0 to 15)).distinct.collect
res23: Array[Int] = Array(4, 0, 8, 12, 13, 1, 9, 5, 14, 6, 10, 2, 15, 11, 7, 3)
<10>partitionBy
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
scala> var rdd=sc.makeRDD(1 to 80,8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:27
scala> res0.collect
res1: Array[String] = Array(0:[1,2,3,4,5,6,7,8,9,10], 1:[11,12,13,14,15,16,17,18,19,20], 2:[21,22,23,24,25,26,27,28,29,30], 3:[31,32,33,34,35,36,37,38,39,40], 4:[41,42,43,44,45,46,47,48,49,50], 5:[51,52,53,54,55,56,57,58,59,60], 6:[61,62,63,64,65,66,67,68,69,70], 7:[71,72,73,74,75,76,77,78,79,80])
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> rdd.map(x=>(x,x)).partitionBy(new HashPartitioner(5))
res2: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:28
scala> res2.partitions.size
res5: Int = 5
scala> res2.mapPartitionsWithIndex((i,x)=>Iterator(i + ":["+x.mkString(",")+"]"))
res6: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at mapPartitionsWithIndex at <console>:30
scala> res6.collect
res7: Array[String] = Array(0:[(5,5),(10,10),(15,15),(20,20),(25,25),(30,30),(35,35),(40,40),(45,45),(50,50),(55,55),(60,60),(65,65),(70,70),(75,75),(80,80)], 1:[(1,1),(6,6),(11,11),(16,16),(21,21),(26,26),(31,31),(36,36),(41,41),(46,46),(51,51),(56,56),(61,61),(66,66),(71,71),(76,76)], 2:[(2,2),(7,7),(12,12),(17,17),(22,22),(27,27),(32,32),(37,37),(42,42),(47,47),(52,52),(57,57),(62,62),(67,67),(72,72),(77,77)], 3:[(3,3),(8,8),(13,13),(18,18),(23,23),(28,28),(33,33),(38,38),(43,43),(48,48),(53,53),(58,58),(63,63),(68,68),(73,73),(78,78)], 4:[(4,4),(9,9),(14,14),(19,19),(24,24),(29,29),(34,34),(39,39),(44,44),(49,49),(54,54),(59,59),(64,64),(69,69),(74,74),(79,79)])
<11>reduceByKey
/**
* 根據Key進行聚合操作
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
scala> val rdd1 = sc.makeRDD(Array((1,1),(1,2),(1,5),(2,3),(2,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at makeRDD at <console>:25
scala> rdd1.reduceByKey(_+_).collect
res8: Array[(Int, Int)] = Array((2,12), (1,8), (3,6))
<12>groupByKey
/**
* 延時處理,但是實際開發,reduceBykey用的更多,將key相同的value聚集到一起
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
scala> val rdd1 = sc.makeRDD(Array((1,1),(1,2),(1,5),(2,3),(2,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[5] at makeRDD at <console>:25
scala> rdd1.groupByKey().collect
res10: Array[(Int, Iterable[Int])] = Array((2,CompactBuffer(3, 4, 5)), (1,CompactBuffer(1, 2, 5)), (3,CompactBuffer(6)))
<13>combineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
scala> var rdd2 = sc.makeRDD(Array(("a",90),("a",80),("a",87),("b",89),("b",74),("c",77),("c",99)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> rdd2.combineByKey(v=>(v,1),(c:(Int,Int),v)=>(c._1+v,c._2+1),(c1:(Int,Int),c2:(Int,Int))=>(c1._1+c2._1,c1._2+c2._2))
res2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[1] at combineByKey at <console>:27
scala> rdd2.collect
res3: Array[(String, Int)] = Array((a,90), (a,80), (a,87), (b,89), (b,74), (c,77), (c,99))
scala> res2.map{case(k,v:(Int,Int))=>(k,v._1/v._2)}
res6: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[2] at map at <console>:29
scala> res6.collect
res7: Array[(String, Int)] = Array((b,81), (a,85), (c,88))
<13>aggregateByKey
/**
*在SeqOP中先講同一個partition內的key值相同情況下各自取出max(value)
*然後再對rdd內所有的partition進行同樣的操作
*最後在CombOP中進行聚合操作
*/
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res0: Array[(Int, Int)] = Array((1,3), (1,2), (2,4), (2,6), (2,8), (2,7), (3,9))
scala> var agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26
scala> agg.collect
res1: Array[(Int, Int)] = Array((1,5), (2,21), (3,9))
<14>foldByKey
//是aggregateByKey的簡化版
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> var fold = rdd.foldByKey(0)(_+_)
fold: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at foldByKey at <console>:26
scala> fold.collect
res2: Array[(Int, Int)] = Array((1,5), (2,25), (3,9))
<15>sortByKey
//根據Key進行排序,但是如果不支援key的排序操作就會繼承withOrdering介面實現compare方法,實現key的大小判定
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
//可以在sortByKey內部提供一個boolean值決定升序還是降序,預設是升序
scala> var rdd=sc.parallelize(List((1,3),(1,2),(2,4),(2,6),(2,8),(2,7),(3,9)),5)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.sortByKey().collect
res3: Array[(Int, Int)] = Array((1,3), (1,2), (2,4), (2,6), (2,8), (2,7), (3,9))
<16>sortBy
//比sortByKey更靈活
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
scala> var rdd1 = sc.makeRDD(1 until(9))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:24
scala> rdd1.sortBy(x=>x).collect
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)
scala> rdd1.sortBy(1 / _).collect
res10: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 1)
<17>join
//連線兩個RDD然後
//JOIN 只留下雙方都有的KEY
//left JOIN 留下左邊RDD的資料
//right JOIN 留下右邊RDD的資料
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
scala> var rdd=sc.makeRDD(Array((1,2),(2,3)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd1=sc.makeRDD(Array((2,4),(3,5)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd.join(rdd1).collect
res15: Array[(Int, (Int, Int))] = Array((2,(3,4)))
<18>cogroup
//分別將相同key的資料聚合到一起
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))]
scala> var rdd=sc.makeRDD(Array((1,2),(2,3)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> var rdd1=sc.makeRDD(Array((2,4),(3,5)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[29] at makeRDD at <console>:24
scala> rdd.cogroup(rdd1).collect
res16: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((2,(CompactBuffer(3),CompactBuffer(4))), (1,(CompactBuffer(2),CompactBuffer())), (3,(CompactBuffer(),CompactBuffer(5))))
<19>Coalesce
//當RDD數遠遠大於節點數時,就會把小的資料集放到一個節點上,減小計算壓力
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
scala> rdd.partitions.size
res18: Int = 2
scala> rdd.coalesce(1)
res19: org.apache.spark.rdd.RDD[(Int, Int)] = CoalescedRDD[35] at coalesce at <console>:27
scala> res19.partitions.size
res20: Int = 1
<20>repartition
//給資料混洗進行重新分割槽
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
scala> rdd.repartition(1)
res21: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[39] at repartition at <console>:27
scala> res21.partitions.size
res22: Int = 1
Action
Transiformation最後返回的一定是RDD
而Action返回的一定不是RDD
<1>reduce
//通過基於reduce內部的f函式對資料集進行聚集操作
def reduce(f: (T, T) => T): T
scala> var rdd=sc.makeRDD(Array[Int](1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at makeRDD at <console>:24
scala> rdd.reduce(_*_)
res26: Int = 362880
<2>collect
//以陣列的形式返回資料集的元素
def collect(): Array[T]
scala> rdd.collect
res27: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
<3>count
//返回資料集的元素個數
def count(): Long
scala> rdd.count
res28: Long = 9
<4>first
//返回第一個元素
def first(): T
scala> rdd.first
res29: Int = 1
<5>take
//返回前num個元素組成的陣列
def take(num: Int): Array[T]
scala> rdd.take(5)
res30: Array[Int] = Array(1, 2, 3, 4, 5)
<6>takeSample
//返回一個從資料集挑選的有num個元素組成的隨機陣列
//boolean決定是否可重複取樣
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T]
scala> rdd.takeSample(true,5)
res31: Array[Int] = Array(1, 1, 5, 9, 4)
<7>takeOrdered
//返回排序後的前num個數據
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
scala> rdd.takeOrdered(4)
res34: Array[Int] = Array(1, 2, 3, 4)
<8>aggregate
//和transiformation的aggregateByKey一樣,只不過返回的是泛型,不需要和RDD型別一樣
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
scala> rdd.aggregate(1)(
| (_*_),
| (_+_))
res35: Int = 15145
<9>fold
//摺疊操作,簡化aggregate
def fold(zeroValue: T)(op: (T, T) => T): T
scala> rdd.fold(1)(_*_)
res42: Int = 362880
<10>saveAsTextFile
//將資料集以檔案形式儲存起來
//path為hdfs的路徑
def saveAsTextFile(path: String): Unit
<11>saveAsSequenceFile
//儲存為Seq型別
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
<12>saveAsObjectFile
//儲存為Object型別
def saveAsObjectFile(path: String): Unit
<13>countByKey
//返回每個key的資料量
def countByKey(): Map[K, Long]
scala> var rdd=sc.makeRDD((List((1,2),(1,100),(1,14),(2,18),(2,14),(3,77))))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[44] at makeRDD at <console>:24
scala> rdd.countByKey
res47: scala.collection.Map[Int,Long] = Map(2 -> 2, 1 -> 3, 3 -> 1)
<14>foreach
//和scala的foreach一樣,對每個元素進行處理
def foreach[U](f: A => U): Unit
scala> rdd.foreach(print(_))
(1,2)(1,100)(1,14)(2,18)(2,14)(3,77)