Spark rdd 介紹,和案例介紹
1.2、建立RDD
1)由一個已經存在的Scala集合建立。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
2)由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)
1.3、RDD程式設計API
1.3.1、Transformation
RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。
常用的Transformation:
轉換 | 含義 |
---|---|
map(func) | 返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成 |
filter(func) | 返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成 |
flatMap(func) | 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素) |
mapPartitions(func) | 類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根據fraction指定的比例對資料進行取樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子 |
union(otherDataset) | 對源RDD和引數RDD求並集後返回一個新的RDD |
intersection(otherDataset) | 對源RDD和引數RDD求交集後返回一個新的RDD |
distinct([numTasks])) | 對源RDD進行去重後返回一個新的RDD |
groupByKey([numTasks]) | 在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 與sortByKey類似,但是更靈活 |
join(otherDataset, [numTasks]) | 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K(Iterable,Iterable))型別的RDD |
cartesian(otherDataset) | 笛卡爾積 |
pipe(command, [envVars]) | |
coalesce(numPartitions) | |
repartition(numPartitions) | |
repartitionAndSortWithinPartitions(partitioner) |
1.3.2、Action
動作 | 含義 |
---|---|
reduce(func) | 通過func函式聚集RDD中的所有元素,這個功能必須是可交換且可並聯的 |
collect() | 在驅動程式中,以陣列的形式返回資料集的所有元素 |
count() | 返回RDD的元素個數 |
first() | 返回RDD的第一個元素(類似於take(1)) |
take(n) | 返回一個由資料集的前n個元素組成的陣列 |
takeSample(withReplacement,num, [seed]) | 返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字 |
saveAsSequenceFile(path) | 將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。 |
saveAsObjectFile(path) | |
countByKey() | 針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。 |
foreach(func) | 在資料集的每一個元素上,執行函式func進行更新。 |
1.4 練習Spark rdd的api
連線Spark-Shell:
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077
練習1
//通過並行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1裡的每一個元素乘2然後排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於十的元素
val rdd3 = rdd2.filter(_ >= 10)
//將元素以陣列的方式在客戶端顯示
rdd3.collect
練習2:
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1裡面的每一個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect
練習3:
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect
練習4:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
rdd4.collect
//按key進行分組
val rdd5 = rdd4.groupByKey
rdd5.collect
練習5:
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect
練習6:
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect
練習7:
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect
練習8:
mapPartitions
def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
該函式和map函式類似,只不過對映函式的引數由RDD中每一個元素變成了RDD中每一個分割槽的迭代器。如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的多
比如:將RDD中的所有元素通過JDBC連線寫入資料庫,如果使用map函式,可能要為每一個元素都建立一個collection,這樣開銷很大,如果使用mapPartitions,那麼只需要針對每一個分割槽建立一個connection.
引數preservesPartitioning表示是否保留父RDD的partitioner分割槽資訊。
//rdd1有兩個分割槽
scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at makeRDD at <console>:24
scala> rdd1.collect
res27: Array[Int] = Array(1, 2, 3, 4, 5)
//rdd3將rdd1中每個分割槽中的數值累加(通過mapPartitions來實現)
scala> var rdd3 = rdd1.mapPartitions{ x => {
| var result = List[Int]()
| var i = 0
| while(x.hasNext) {
| i += x.next()
| }
| result.::(i).iterator
| }}
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at mapPartitions at <console>:26
//檢視合併結果後的rdd3的值
scala> rdd3.collect
res28: Array[Int] = Array(3, 12)
//檢視rdd3的分割槽大小
scala> rdd3.partitions.size
res29: Int = 2
練習9:
mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
函式作用通mapPartitions,不過提供了兩個引數,第一個引數為分割槽的索引
例如:
scala> var rdd1 = sc.makeRDD(1 to 25,4)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24
scala> var rdd2 = rdd1.mapPartitionsWithIndex{
| (x,iter) => {
| var result = List[String]()
| var i = 0
| while(iter.hasNext) {
| i += iter.next()
| }
| result.::(x + "|" + i).iterator
| }
| }
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[67] at mapPartitionsWithIndex at <console>:26
//獲取結果值(從返回的結果中可以看到)
scala> rdd2.collect
res30: Array[String] = Array(0|21, 1|57, 2|93, 3|154)
再如:
scala> val func = (index:Int,iter:Iterator[(Int)])=> {
| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
| }
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
scala> rdd1.mapPartitionsWithIndex(func).collect
res0: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [part], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])
練習8:
aggregate函式將每個分割槽裡的元素進行聚合,然後用combine函式將每個分割槽的結果和初始值(zerorValue)進行combine操作。這個函式最終返回的型別不需要和RDD中元素型別一致。
函式原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
aggregate
聚合,先在分割槽內進行聚合,然後再將每個分割槽的結果一起結果進行聚合
scala> def func1(index:Int,iter:Iterator[(Int)]):Iterator[String] = {
| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
| }
func1: (index: Int, iter: Iterator[Int])Iterator[String]
//建立一個並行化的RDD,有兩個分割槽
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[77] at parallelize at <console>:24
//通過下面的程式碼可以看到rdd1中內容再兩個分割槽內的分佈情況,通過下面的結果可以看出有兩個分割槽,分別是partID:0和partID:1
scala> rdd1.mapPartitionsWithIndex(func1).collect
res56: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [partID:1,val:6], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])
//下面的執行步驟是:
//一:0和1取出最大值1,1和2取出最大值2,2和3取出最大值3,3和4取出最大值4===》第一個分割槽的最大值是4
//二:0和5取出最大值5,5和6取出最大值6,6和7取出最大值7,7和8取出最大值8,8和9取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(0) + 4+9=13
scala> rdd1.aggregate(0)(math.max(_,_),_+_)
res57: Int = 13
//下面的執行步驟是:
//一:3和1取出最大值3,3和2取出最大值3,3和3取出最大值3,3和4取出最大值4===》第一個分割槽的最大值是4
//二:3和5取出最大值5,5和6取出最大值6,6和7取出最大值7,7和8取出最大值8,8和9取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(3)+4+9=16
scala> rdd1.aggregate(3)(math.max(_,_),_+_)
res62: Int = 16
//下面的執行步驟是:
//一:5和1取出最大值5,5和2取出最大值5,5和3取出最大值5,5和4取出最大值5===》第一個分割槽的最大值是5
//二:5和5取出最大值5,5和6取出最大值6,6和7取出最大值7,7和8取出最大值8,8和9取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(5)+5+9=19
scala> rdd1.aggregate(5)(math.max(_,_),_+_)
res58: Int = 19
再如:
//下面的執行步驟是:
//一:8和1取出最大值8,8和2取出最大值8,8和3取出最大值8,8和4取出最大值8===》第一個分割槽的最大值是8
//二:8和5取出最大值8,8和6取出最大值8,8和7取出最大值8,8和8取出最大值8,8和9取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(8)+8+9=25
scala> rdd1.aggregate(8)(math.max(_,_),_+_)
res58: Int = 19
再如:
//下面的執行步驟是:
//一:10和1取出最大值10,10和2取出最大值10,10和3取出最大值10,10和4取出最大值10===》第一個分割槽的最大值是10
//二:10和5取出最大值10,10和6取出最大值10,10和7取出最大值10,10和8取出最大值10,10和9取出最大值10====>第二個分割槽的最大值是10
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(10)+10+10=30
scala> rdd1.aggregate(10)(math.max(_,_),_+_)
res58: Int = 30
================================================================================
下面是字串的聚合
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[79] at parallelize at <console>:24
scala> def fun2(index:Int,iter:Iterator[(String)]):Iterator[String] = {
| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
| }
fun2: (index: Int, iter: Iterator[String])Iterator[String]
//通過下面的結果可以知道:"a","b","c"在partID:0中,"d","e","f"在partID:1中
scala> rdd2.mapPartitionsWithIndex(fun2).collect
res63: Array[String] = Array([partID:0,val:a], [partID:0,val:b], [partID:0,val:c], [partID:1,val:d], [partID:1,val:e], [partID:1,val:f])
//下面的執行順序是:
//一、""和"a"相加得"a","a"和"b"相加得"ab","ab"和"c"相加得"abc",第一個分割槽得到的結果是:"abc"
//一、""和"d"相加得"d","d"和"e"相加得"de","ed"和"f"相加得"def",第一個分割槽得到的結果是:"def"
//三、由於是並行的計算,所以可能是第一個分割槽先執行完,此時的結果是:"" + "abc" + "def" ===》"abcdef";若是第二個分割槽先執行完,此時的結果是:"" + "def" + "abc" ===》"defabc"
scala> rdd2.aggregate("")(_+_,_+_)
res64: String = abcdef
scala> rdd2.aggregate("")(_+_,_+_)
res65: String = defabc
//下面的執行順序是:
//一、"="和"a"相加得"=a","=a"和"b"相加得"=ab","=ab"和"c"相加得"=abc",第一個分割槽得到的結果是:"=abc"
//一、"="和"d"相加得"=d","=d"和"e"相加得"=de","=ed"和"f"相加得"=def",第一個分割槽得到的結果是:"=def"
//三、由於是並行的計算,所以可能是第一個分割槽先執行完,此時的結果是:"=" + "=abc" + "=def" ===》"==abc=def";若是第二個分割槽先執行完,此時的結果是:"="+"=def" + "=abc" ===》"==def=abc"
//下面的結果中分別是:res68: String = ==def=abc 和 res69: String = ==abc=def,和上面的推算結果一致
scala> rdd2.aggregate("=")(_ + _, _ + _)
res68: String = ==def=abc
scala> rdd2.aggregate("=")(_ + _, _ + _)
res69: String = ==abc=def
val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
//通過下面可以知道有兩個分割槽,並且每個分割槽中有不同的值
scala> rdd3.mapPartitionsWithIndex(fun2).collect
res70: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:4567])
//下面的執行步驟是(scala> "".length結果是res72: Int = 0),(scala>"12".length結果是res73:Int=2):
//一:"".length和"12".length求出最大值2,得到字串是"2";"2".length和"23".length求出最大值2,得到的字串是2;所以第一個分割槽計算出的結果是:"2"
//二:"".length和"345".length求出最大值3,得到字串是"3";"3".length和"4567".length求出最大值4,得到的字串是4;所以第一個分割槽計算出的結果是:"4"
//三:得到的結果最後執行x+y,由於是平行計算所以可能是"24"或者"42"
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res75: String = 24
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res76: String = 42
//下面求最小值:
scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:24
scala> rdd4.mapPartitionsWithIndex(fun2).collect
res79: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:])
//執行過程是:
//一:"".length和"12".length求出最小值0,得到字串是"0";"0".length和"23".length求出最小值1,得到的字串是0;所以第一個分割槽計算出的結果是:"0"
//二:"".length和"345".length求出最小值0,得到字串是"0";"0".length和"".length求出最小值0,得到的字串是0;所以第一個分割槽計算出的結果是:"0"
//三:得到的結果最後執行x+y,由於是平行計算所以可能是"01"或"10"
scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res85: String = 10
scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res86: String = 01
val rdd5 = sc.parallelize(List("12","23","","345"),2)
//執行過程是:
//一:"".length和"12".length求出最小值0,得到字串是"0";"0".length和"23".length求出最小值1,得到的字串是0;所以第一個分割槽計算出的結果是:"0"
//二:"".length和"".length求出最小值0,得到字串是"0";"0".length和"345".length求出最小值1,得到的字串是1;所以第一個分割槽計算出的結果是:"1"
//三:得到的結果最後執行x+y,由於是平行計算所以可能是"1"或
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
再如案例:
scala> def seqOP(a:Int, b:Int) : Int = {
| println("seqOp: " + a + "\t" + b)
| math.min(a,b)
| }
seqOP: (a: Int, b: Int)Int
scala> def combOp(a:Int, b:Int): Int = {
| println("combOp: " + a + "\t" + b)
| a + b
| }
combOp: (a: Int, b: Int)Int
scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
//這裡要注意的是上面的z是Int型別的,所以下面要用於集合迭代的型別也是Int型別的。
scala> def fun2(index:Int,iter:Iterator[(Int)]):Iterator[String] = {
| iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
| }
fun2: (index: Int, iter: Iterator[Int])Iterator[String]
//通過下面的方式顯示出每個值所在的分割槽
scala> z.mapPartitionsWithIndex(fun2).collect
res94: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:1,val:4], [partID:1,val:5], [partID:1,val:6])
//下面的含義是:兩個分割槽每個裡面先單獨執行seqOP,兩個都執行完成之後,再執行comOp邏輯,所以下面的執行過程是:
//一、3和1執行seqOP的最小值是1,1和2執行seqOP間的最小值是1,1和3執行seqOP的最小值是1,第一個分割槽得到的結果是1
//二、3和4執行seqOP的最小值是3,3和5執行seqOP間的最小值是3,3和6執行seqOP的最小值是3,第一個分割槽得到的結果是3
//三、接著執行comOp邏輯,(3)和分割槽一種的1執行combOp得到的結果是:3+1=4,4接著和分割槽二中的3執行combOp得到的結果是4+3=7,所以最後的結果是:7
scala> z.aggregate(3)(seqOP, combOp)
combOp:3 1
combOp:4 3
res95: Int = 7
//再次驗證:
//一、2和1執行seqOP的最小值是1,1和2執行seqOP間的最小值是1,1和3執行seqOP的最小值是1,第一個分割槽得到的結果是1
//二、2和4執行seqOP的最小值是2,2和5執行seqOP間的最小值是2,2和6執行seqOP的最小值是2,第一個分割槽得到的結果是2
//三、接著執行comOp邏輯,(2)和分割槽一種的1執行combOp得到的結果是:2+1=3,3接著和分割槽二中的2執行combOp得到的結果是3 + 2=5,所以最後的結果是:5
scala> z.aggregate(2)(seqOP, combOp)
[Stage 105:> (0 + 0) / 2]combOp:2 1
combOp:3 2
res96: Int = 5
//下面的同樣:
scala> def seqOp(a:String, b:String) : String = {
| println("seqOp: " + a + "\t" + b)
| math.min(a.length , b.length ).toString
| }
seqOp: (a: String, b: String)String
scala> def combOp(a:String, b:String) : String = {
| println("combOp: " + a + "\t" + b)
| a + b
| }
combOp: (a: String, b: String)String
scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp: 345
seqOp: 12
seqOp: 0 4567
seqOp: 0 23
combOp: 1
combOp: 1 1
res25: String = 11
練習10:
aggregateByKey
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
scala> pairRDD.mapPartitionsWithIndex(func2).collect
res99: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
//執行過程是:
//1、每個分割槽中的內容都按照key先進行分組,
//第一個分割槽分組後的結果是:(cat,(2,5))、(mouse,(4))
//第二個分割槽分組後的結果是:(cat,(12))、(dog,(12))、(mouse,(2))
//2、接著0,分別和每組中的結果比對,
//對於分割槽一:0和cat中的2比較,得到最大值2;2和cat中的5比較,得到的最大結果是5。同樣mouse執行相同操作。所以最終得到的是:(cat,(5)),(mouse,(4))
//對於分割槽二:0和cat中的12比較,得到的最大值12。依次類推,最終得到的結果是:(cat,(12))、(dog,(12))、(mouse,(2))
//3、接著0和分割槽一和分割槽二中每個最大值相加,最終得到的結果是:
// (cat,(5)) + (cat,(12)) ⇒ (cat,(5 + 12)) ==> (cat,(17))
//(mouse,(4)) + (mouse,(2)) ⇒ (mouse,(4 + 2)) ==> (mouse,(6))
//(dog,(12)) ⇒ (dog,(12)) ==> (dog,(12))
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
//執行過程是:
//1、每個分割槽中的內容都按照key先進行分組,
//第一個分割槽分組後的結果是:(cat,(2,5))、(mouse,(4))
//第二個分割槽分組後的結果是:(cat,(12))、(dog,(12))、(mouse,(2))
//2、接著100,分別和每組中的結果比對,
//對於分割槽一:100和cat中的2比較,得到最大值100;100和cat中的5比較,得到的最大結果是100。同樣mouse執行相同操作。所以最終得到的是:(cat,(100)),(mouse,(100))
//對於分割槽二:100和cat中的12比較,得到的最大值100。依次類推,最終得到的結果是:(cat,(100))、(dog,(100))、(mouse,(100))
//3、接著100和分割槽一和分割槽二中每個最大值相加,最終得到的結果是:
//(cat,(100)) + (cat,(100)) ⇒ (cat,(100 + 100)) ==> (cat,(200))
//(mouse,(100)) + (mouse,(100)) ⇒ (mouse,(100 + 100)) ==> (mouse,(200))
//(dog,(100)) + (dog,(100)) ⇒ (dog,(100)) ==> (dog,(100))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
練習11:
checkpoint (知識點可以檢視:http://blog.csdn.net/tototuzuoquan/article/details/74838936)
為當前RDD設定檢查點。該函式將會建立一個二進位制的檔案,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設定的。在checkpoint的過程中,該RDD的所有依賴於父RDD中的資訊將全部被移出。對RDD進行checkpoint操作並不會馬上被執行,必須執行Action操作才能觸發。
函式原型:
def checkpoint()
例項:
scala> val data = sc.parallelize(1 to 100000,15)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at parallelize at <console>:24
scala> sc.setCheckpointDir("/iteblog")
17/07/07 19:17:22 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/iteblog' appears to be on the local filesystem.
scala> data.checkpoint
scala> data.count
res105: Long = 100000
[[email protected] hadoop-2.8.0]# hdfs dfs -ls /iteblog
Found 1 items
drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
[[email protected] hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
Found 1 items
drwxr-xr-x - root supergroup 0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
[[email protected] hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
Found 15 items
-rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00000
-rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00001
-rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00002
-rw-r--r-- 3 root supergroup 71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00003
-rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00004
-rw-r--r-- 3 root supergroup 71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00005
-rw-r--r-- 3 root supergroup 71219 2017-07-07