1. 程式人生 > >Spark rdd 介紹,和案例介紹

Spark rdd 介紹,和案例介紹

1.2、建立RDD

1)由一個已經存在的Scala集合建立。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

2)由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)

1.3、RDD程式設計API

1.3.1、Transformation

RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。

常用的Transformation:

轉換 含義
map(func) 返回一個新的RDD,該RDD由每一個輸入元素經過func函式轉換後組成
filter(func) 返回一個新的RDD,該RDD由經過func函式計算後返回值為true的輸入元素組成
flatMap(func) 類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func) 類似於map,但獨立地在RDD的每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據fraction指定的比例對資料進行取樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和引數RDD求並集後返回一個新的RDD
intersection(otherDataset) 對源RDD和引數RDD求交集後返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重後返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上呼叫,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上呼叫,返回一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上呼叫,K必須實現Ordered介面,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活
join(otherDataset, [numTasks]) 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在型別為(K,V)和(K,W)的RDD上呼叫,返回一個(K(Iterable,Iterable))型別的RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

1.3.2、Action

動作 含義
reduce(func) 通過func函式聚集RDD中的所有元素,這個功能必須是可交換且可並聯的
collect() 在驅動程式中,以陣列的形式返回資料集的所有元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似於take(1))
take(n) 返回一個由資料集的前n個元素組成的陣列
takeSample(withReplacement,num, [seed]) 返回一個數組,該陣列由從資料集中隨機取樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering])
saveAsTextFile(path) 將資料集的元素以textfile的形式儲存到HDFS檔案系統或者其他支援的檔案系統,對於每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文字
saveAsSequenceFile(path) 將資料集中的元素以Hadoop sequencefile的格式儲存到指定的目錄下,可以使HDFS或者其他Hadoop支援的檔案系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)型別的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在資料集的每一個元素上,執行函式func進行更新。

1.4 練習Spark rdd的api

連線Spark-Shell:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077

練習1

//通過並行化生成rdd
val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
//對rdd1裡的每一個元素乘2然後排序
val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
//過濾出大於等於十的元素
val rdd3 = rdd2.filter(_ >= 10)
//將元素以陣列的方式在客戶端顯示
rdd3.collect

練習2:

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
//將rdd1裡面的每一個元素先切分在壓平
val rdd2 = rdd1.flatMap(_.split(' '))
rdd2.collect

練習3:

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//求交集
val rdd4 = rdd1.intersection(rdd2)
//去重
rdd3.distinct.collect
rdd4.collect

練習4:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect
//求並集
val rdd4 = rdd1 union rdd2
rdd4.collect
//按key進行分組
val rdd5 = rdd4.groupByKey
rdd5.collect

練習5:

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//注意cogroup與groupByKey的區別
rdd3.collect

練習6:

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
//reduce聚合
val rdd2 = rdd1.reduce(_ + _)
rdd2.collect

練習7:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2),  ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect

練習8:

mapPartitions
    def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
   該函式和map函式類似,只不過對映函式的引數由RDD中每一個元素變成了RDD中每一個分割槽的迭代器。如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的多
   比如:將RDD中的所有元素通過JDBC連線寫入資料庫,如果使用map函式,可能要為每一個元素都建立一個collection,這樣開銷很大,如果使用mapPartitions,那麼只需要針對每一個分割槽建立一個connection.
   引數preservesPartitioning表示是否保留父RDD的partitioner分割槽資訊。
//rdd1有兩個分割槽
scala> var rdd1 = sc.makeRDD(1 to 5,2)
       rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at makeRDD at <console>:24
scala> rdd1.collect
       res27: Array[Int] = Array(1, 2, 3, 4, 5)
//rdd3將rdd1中每個分割槽中的數值累加(通過mapPartitions來實現)
scala> var rdd3 = rdd1.mapPartitions{ x => {
     |     var result = List[Int]()
     |     var i = 0
     |     while(x.hasNext) {
     |         i += x.next()
     |     }
     |     result.::(i).iterator
     | }}
     rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at mapPartitions at <console>:26
//檢視合併結果後的rdd3的值
scala> rdd3.collect
     res28: Array[Int] = Array(3, 12)
//檢視rdd3的分割槽大小                                                
scala> rdd3.partitions.size
res29: Int = 2

練習9:

mapPartitionsWithIndex
    def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
    函式作用通mapPartitions,不過提供了兩個引數,第一個引數為分割槽的索引
例如:
scala> var rdd1 = sc.makeRDD(1 to 25,4)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at <console>:24

scala> var rdd2 = rdd1.mapPartitionsWithIndex{
     |  (x,iter) => {
     |      var result = List[String]()
     |      var i = 0
     |      while(iter.hasNext) {
     |          i += iter.next()
     |      }
     |      result.::(x + "|" + i).iterator
     |  }
     | }
     rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[67] at mapPartitionsWithIndex at <console>:26

//獲取結果值(從返回的結果中可以看到)
scala> rdd2.collect
    res30: Array[String] = Array(0|21, 1|57, 2|93, 3|154)

再如:
scala> val func = (index:Int,iter:Iterator[(Int)])=> {
     | iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
     | }
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
scala> rdd1.mapPartitionsWithIndex(func).collect
    res0: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4],  [partID:1,val:5], [part], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])

練習8:
aggregate函式將每個分割槽裡的元素進行聚合,然後用combine函式將每個分割槽的結果和初始值(zerorValue)進行combine操作。這個函式最終返回的型別不需要和RDD中元素型別一致。
函式原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

aggregate
聚合,先在分割槽內進行聚合,然後再將每個分割槽的結果一起結果進行聚合

scala> def func1(index:Int,iter:Iterator[(Int)]):Iterator[String] = {
     | iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
     | }
    func1: (index: Int, iter: Iterator[Int])Iterator[String]

//建立一個並行化的RDD,有兩個分割槽
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[77] at parallelize at <console>:24
//通過下面的程式碼可以看到rdd1中內容再兩個分割槽內的分佈情況,通過下面的結果可以看出有兩個分割槽,分別是partID:0和partID:1
scala> rdd1.mapPartitionsWithIndex(func1).collect
    res56: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:0,val:4], [partID:1,val:5], [partID:1,val:6], [partID:1,val:7], [partID:1,val:8], [partID:1,val:9])
//下面的執行步驟是:
//一:01取出最大值112取出最大值223取出最大值334取出最大值4===》第一個分割槽的最大值是4
//二:05取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(0) + 4+9=13
scala> rdd1.aggregate(0)(math.max(_,_),_+_)
res57: Int = 13

//下面的執行步驟是:
//一:31取出最大值332取出最大值333取出最大值334取出最大值4===》第一個分割槽的最大值是4
//二:35取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(3)+4+9=16
scala> rdd1.aggregate(3)(math.max(_,_),_+_)
res62: Int = 16

//下面的執行步驟是:
//一:51取出最大值552取出最大值553取出最大值554取出最大值5===》第一個分割槽的最大值是5
//二:55取出最大值556取出最大值667取出最大值778取出最大值889取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(5)+5+9=19
scala> rdd1.aggregate(5)(math.max(_,_),_+_)
res58: Int = 19

再如:
//下面的執行步驟是:
//一:81取出最大值882取出最大值883取出最大值884取出最大值8===》第一個分割槽的最大值是8
//二:85取出最大值886取出最大值887取出最大值888取出最大值889取出最大值9====>第二個分割槽的最大值是9
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(8)+8+9=25
scala> rdd1.aggregate(8)(math.max(_,_),_+_)
res58: Int = 19

再如:
//下面的執行步驟是:
//一:101取出最大值10102取出最大值10103取出最大值10104取出最大值10===》第一個分割槽的最大值是10
//二:105取出最大值10106取出最大值10107取出最大值10108取出最大值10109取出最大值10====>第二個分割槽的最大值是10
//三:後面的執行邏輯是:_+_,就是說將兩個分割槽的最大結果值求和,執行的結果是:(10)+10+10=30
scala> rdd1.aggregate(10)(math.max(_,_),_+_)
res58: Int = 30

================================================================================
下面是字串的聚合
scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[79] at parallelize at <console>:24

scala> def fun2(index:Int,iter:Iterator[(String)]):Iterator[String] = {
     | iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
     | }
fun2: (index: Int, iter: Iterator[String])Iterator[String]
//通過下面的結果可以知道:"a","b","c"在partID:0中,"d","e","f"在partID:1中
scala> rdd2.mapPartitionsWithIndex(fun2).collect
res63: Array[String] = Array([partID:0,val:a], [partID:0,val:b], [partID:0,val:c], [partID:1,val:d], [partID:1,val:e], [partID:1,val:f])
//下面的執行順序是:
//一、"""a"相加得"a","a""b"相加得"ab","ab""c"相加得"abc",第一個分割槽得到的結果是:"abc"
//一、"""d"相加得"d","d""e"相加得"de","ed""f"相加得"def",第一個分割槽得到的結果是:"def"
//三、由於是並行的計算,所以可能是第一個分割槽先執行完,此時的結果是:"" + "abc" + "def" ===》"abcdef";若是第二個分割槽先執行完,此時的結果是:"" + "def" + "abc" ===》"defabc"
scala> rdd2.aggregate("")(_+_,_+_)
res64: String = abcdef
scala> rdd2.aggregate("")(_+_,_+_)
res65: String = defabc

//下面的執行順序是:
//一、"=""a"相加得"=a","=a""b"相加得"=ab","=ab""c"相加得"=abc",第一個分割槽得到的結果是:"=abc"
//一、"=""d"相加得"=d","=d""e"相加得"=de","=ed""f"相加得"=def",第一個分割槽得到的結果是:"=def"
//三、由於是並行的計算,所以可能是第一個分割槽先執行完,此時的結果是:"=" + "=abc" + "=def" ===》"==abc=def";若是第二個分割槽先執行完,此時的結果是:"="+"=def" + "=abc" ===》"==def=abc"
//下面的結果中分別是:res68: String = ==def=abc 和 res69: String = ==abc=def,和上面的推算結果一致
scala> rdd2.aggregate("=")(_ + _, _ + _)
res68: String = ==def=abc
scala> rdd2.aggregate("=")(_ + _, _ + _)
res69: String = ==abc=def



val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
//通過下面可以知道有兩個分割槽,並且每個分割槽中有不同的值
scala> rdd3.mapPartitionsWithIndex(fun2).collect
res70: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:4567])
//下面的執行步驟是(scala> "".length結果是res72: Int = 0),(scala>"12".length結果是res73:Int=2):
//一:"".length"12".length求出最大值2,得到字串是"2";"2".length和"23".length求出最大值2,得到的字串是2;所以第一個分割槽計算出的結果是:"2"
//二:"".length"345".length求出最大值3,得到字串是"3";"3".length和"4567".length求出最大值4,得到的字串是4;所以第一個分割槽計算出的結果是:"4"
//三:得到的結果最後執行x+y,由於是平行計算所以可能是"24"或者"42"
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res75: String = 24
scala> rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res76: String = 42


//下面求最小值:
scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:24
scala> rdd4.mapPartitionsWithIndex(fun2).collect
res79: Array[String] = Array([partID:0,val:12], [partID:0,val:23], [partID:1,val:345], [partID:1,val:])
//執行過程是:
//一:"".length"12".length求出最小值0,得到字串是"0";"0".length和"23".length求出最小值1,得到的字串是0;所以第一個分割槽計算出的結果是:"0"
//二:"".length"345".length求出最小值0,得到字串是"0";"0".length和"".length求出最小值0,得到的字串是0;所以第一個分割槽計算出的結果是:"0"
//三:得到的結果最後執行x+y,由於是平行計算所以可能是"01""10"
scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res85: String = 10

scala> rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res86: String = 01


val rdd5 = sc.parallelize(List("12","23","","345"),2)
//執行過程是:
//一:"".length"12".length求出最小值0,得到字串是"0";"0".length和"23".length求出最小值1,得到的字串是0;所以第一個分割槽計算出的結果是:"0"
//二:"".length"".length求出最小值0,得到字串是"0";"0".length和"345".length求出最小值1,得到的字串是1;所以第一個分割槽計算出的結果是:"1"
//三:得到的結果最後執行x+y,由於是平行計算所以可能是"1"或
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)


再如案例:
scala> def seqOP(a:Int, b:Int) : Int = {
     | println("seqOp: " + a + "\t" + b)
     | math.min(a,b)
     | }
seqOP: (a: Int, b: Int)Int

scala> def combOp(a:Int, b:Int): Int = {
     | println("combOp: " + a + "\t" + b)
     | a + b
     | }
combOp: (a: Int, b: Int)Int

scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
//這裡要注意的是上面的z是Int型別的,所以下面要用於集合迭代的型別也是Int型別的。
scala> def fun2(index:Int,iter:Iterator[(Int)]):Iterator[String] = {
     | iter.toList.map(x => "[partID:" + index + ",val:" + x + "]").iterator
     | }
fun2: (index: Int, iter: Iterator[Int])Iterator[String]
//通過下面的方式顯示出每個值所在的分割槽
scala> z.mapPartitionsWithIndex(fun2).collect
res94: Array[String] = Array([partID:0,val:1], [partID:0,val:2], [partID:0,val:3], [partID:1,val:4], [partID:1,val:5], [partID:1,val:6])
//下面的含義是:兩個分割槽每個裡面先單獨執行seqOP,兩個都執行完成之後,再執行comOp邏輯,所以下面的執行過程是:
//一、31執行seqOP的最小值是112執行seqOP間的最小值是113執行seqOP的最小值是1,第一個分割槽得到的結果是1
//二、34執行seqOP的最小值是335執行seqOP間的最小值是336執行seqOP的最小值是3,第一個分割槽得到的結果是3
//三、接著執行comOp邏輯,(3)和分割槽一種的1執行combOp得到的結果是:3+1=44接著和分割槽二中的3執行combOp得到的結果是4+3=7,所以最後的結果是:7
scala> z.aggregate(3)(seqOP, combOp)
combOp:3    1
combOp:4    3
res95: Int = 7

//再次驗證:
//一、21執行seqOP的最小值是112執行seqOP間的最小值是113執行seqOP的最小值是1,第一個分割槽得到的結果是1
//二、24執行seqOP的最小值是225執行seqOP間的最小值是226執行seqOP的最小值是2,第一個分割槽得到的結果是2
//三、接著執行comOp邏輯,(2)和分割槽一種的1執行combOp得到的結果是:2+1=33接著和分割槽二中的2執行combOp得到的結果是3 + 2=5,所以最後的結果是:5
scala> z.aggregate(2)(seqOP, combOp)
[Stage 105:>                                                        (0 + 0) / 2]combOp:2    1
combOp:3    2
res96: Int = 5 

//下面的同樣:
scala> def seqOp(a:String, b:String) : String = {
     | println("seqOp: " + a + "\t" + b)
     | math.min(a.length , b.length ).toString
     | }
seqOp: (a: String, b: String)String

scala> def combOp(a:String, b:String) : String = {
     |  println("combOp: " + a + "\t" + b)
     | a + b
     | }
combOp: (a: String, b: String)String

scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp:  345
seqOp:  12
seqOp: 0    4567
seqOp: 0    23
combOp:     1
combOp: 1   1

res25: String = 11

練習10:
aggregateByKey

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def func2(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
scala> pairRDD.mapPartitionsWithIndex(func2).collect
res99: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
//執行過程是:
//1、每個分割槽中的內容都按照key先進行分組,
//第一個分割槽分組後的結果是:(cat,(2,5))、(mouse,(4))
//第二個分割槽分組後的結果是:(cat,(12))、(dog,(12))、(mouse,(2))
//2、接著0,分別和每組中的結果比對,
//對於分割槽一:0和cat中的2比較,得到最大值2;2和cat中的5比較,得到的最大結果是5。同樣mouse執行相同操作。所以最終得到的是:(cat,(5)),(mouse,(4))
//對於分割槽二:0和cat中的12比較,得到的最大值12。依次類推,最終得到的結果是:(cat,(12))、(dog,(12))、(mouse,(2))
//3、接著0和分割槽一和分割槽二中每個最大值相加,最終得到的結果是:
// (cat,(5)) + (cat,(12)) ⇒ (cat,(5 + 12)) ==> (cat,(17))
//(mouse,(4)) + (mouse,(2)) ⇒ (mouse,(4 + 2)) ==> (mouse,(6))
//(dog,(12)) ⇒ (dog,(12)) ==> (dog,(12))
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect

//執行過程是:
//1、每個分割槽中的內容都按照key先進行分組,
//第一個分割槽分組後的結果是:(cat,(2,5))、(mouse,(4))
//第二個分割槽分組後的結果是:(cat,(12))、(dog,(12))、(mouse,(2))
//2、接著100,分別和每組中的結果比對,
//對於分割槽一:100和cat中的2比較,得到最大值100;100和cat中的5比較,得到的最大結果是100。同樣mouse執行相同操作。所以最終得到的是:(cat,(100)),(mouse,(100))
//對於分割槽二:100和cat中的12比較,得到的最大值100。依次類推,最終得到的結果是:(cat,(100))、(dog,(100))、(mouse,(100))
//3、接著100和分割槽一和分割槽二中每個最大值相加,最終得到的結果是:
//(cat,(100)) + (cat,(100)) ⇒ (cat,(100 + 100)) ==> (cat,(200))
//(mouse,(100)) + (mouse,(100)) ⇒ (mouse,(100 + 100)) ==> (mouse,(200))
//(dog,(100)) + (dog,(100)) ⇒ (dog,(100)) ==> (dog,(100))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect

練習11:
checkpoint (知識點可以檢視:http://blog.csdn.net/tototuzuoquan/article/details/74838936
為當前RDD設定檢查點。該函式將會建立一個二進位制的檔案,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設定的。在checkpoint的過程中,該RDD的所有依賴於父RDD中的資訊將全部被移出。對RDD進行checkpoint操作並不會馬上被執行,必須執行Action操作才能觸發。
函式原型:
def checkpoint()
例項:

scala> val data = sc.parallelize(1 to 100000,15)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at parallelize at <console>:24
scala> sc.setCheckpointDir("/iteblog")
17/07/07 19:17:22 WARN spark.SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory '/iteblog' appears to be on the local filesystem.
scala> data.checkpoint
scala> data.count
res105: Long = 100000

[[email protected] hadoop-2.8.0]# hdfs dfs -ls /iteblog
Found 1 items
drwxr-xr-x   - root supergroup          0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
[[email protected] hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c
Found 1 items
drwxr-xr-x   - root supergroup          0 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
[[email protected] hadoop-2.8.0]# hdfs dfs -ls /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94
Found 15 items
-rw-r--r--   3 root supergroup      71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00000
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00001
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00002
-rw-r--r--   3 root supergroup      71219 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00003
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00004
-rw-r--r--   3 root supergroup      71229 2017-07-07 19:17 /iteblog/0ca2df38-2dc6-451b-94d8-fbc442c1611c/rdd-94/part-00005
-rw-r--r--   3 root supergroup      71219 2017-07-07