1. 程式人生 > >RDD操作詳解

RDD操作詳解

1.1.RDD操作詳解
啟動spark-shell
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell --master spark://node1.itcast.cn:7077
或者
採用離線測試
基本轉換
map
map是對RDD中的每個元素都執行一個指定的函式來產生一個新的RDD。 任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
舉例:
scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
上述例子中把原RDD中每個元素都乘以2來產生一個新的RDD。
filter
filter 是對RDD中的每個元素都執行一個指定的函式來過濾產生一個新的RDD。 任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

val rdd = sc.parallelize(List(1,2,3,4,5,6))
val filterRdd = rdd.filter(_ > 5)
filterRdd.collect() //返回所有大於5的資料的一個Array, Array(6,8,10,12)
flatMap
與map類似,區別是原RDD中的元素經map處理後只能生成一個元素,而原RDD中的元素經flatmap處理後可生成多個元素來構建新RDD。 舉例:對原RDD中的每個元素x產生y個元素(從1到y,y為元素x的值)
scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

mapPartitions
mapPartitions是map的一個變種。map的輸入函式是應用於RDD中每個元素,而mapPartitions的輸入函式是應用於每個分割槽,也就是把每個分割槽中的內容作為整體來處理的。 它的函式定義為:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
f即為輸入函式,它處理每個分割槽裡面的內容。每個分割槽中的內容將以Iterator[T]傳遞給輸入函式f,f的輸出結果是Iterator[U]。最終的RDD由所有分割槽經過輸入函式處理後的結果合併起來的。
舉例:
scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List(T, T)


var pre = iter.next
while (iter.hasNext) {
val cur = iter.next
res.::=(pre, cur)
pre = cur }
res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
上述例子中的函式myfunc是把分割槽中一個元素和它的下一個元素組成一個Tuple。因為分割槽中最後一個元素沒有下一個元素了,所以(3,4)和(6,7)不在結果中。 mapPartitions還有些變種,比如mapPartitionsWithContext,它能把處理過程中的一些狀態資訊傳遞給使用者指定的輸入函式。還有mapPartitionsWithIndex,它能把分割槽的index傳遞給使用者指定的輸入函式。
mapPartitionsWithIndex
def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
函式作用同mapPartitions,不過提供了兩個引數,第一個引數為分割槽的索引。

var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有兩個分割槽
var rdd2 = rdd1.mapPartitionsWithIndex{
(x,iter) => {
var result = ListString
var i = 0
while(iter.hasNext){
i += iter.next()
}
result.::(x + “|” + i).iterator

    }
  }

//rdd2將rdd1中每個分割槽的數字累加,並在每個分割槽的累加結果前面加了分割槽索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)

mapWith
mapWith是map的另外一個變種,map只需要一個輸入函式,而mapWith有兩個輸入函式。它的定義如下:
def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
第一個函式constructA是把RDD的partition index(index從0開始)作為輸入,輸出為新型別A;
第二個函式f是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函式的輸出),輸出型別為U。
舉例:把partition index 乘以10加2,作為新的RDD的元素。
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)
x.mapWith(a => a * 10)((b, a) => (b,a + 2)).collect
結果:
(1,2)
(2,2)
(3,2)
(4,12)
(5,12)
(6,12)
(7,22)
(8,22)
(9,22)
(10,22)
flatMapWith
flatMapWith與mapWith很類似,都是接收兩個函式,一個函式把partitionIndex作為輸入,輸出是一個新型別A;另外一個函式是以二元組(T,A)作為輸入,輸出為一個序列,這些序列裡面的元素組成了新的RDD。它的定義如下:
def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]
舉例:
scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

該函式用於將RDD進行重分割槽,使用HashPartitioner。

第一個引數為重分割槽的數目,第二個為是否進行shuffle,預設為false;

以下面的例子來看:

scala> var data = sc.parallelize(1 to 12, 3)
scala> data.collect
scala> data.partitions.size

scala> var rdd1 = data.coalesce(1)
scala> rdd1.partitions.size

scala> var rdd1 = data.coalesce(4)
scala> rdd1.partitions.size
res2: Int = 1 //如果重分割槽的數目大於原來的分割槽數,那麼必須指定shuffle引數為true,//否則,分割槽數不便

scala> var rdd1 = data.coalesce(4,true)
scala> rdd1.partitions.size
res3: Int = 4

repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

該函式其實就是coalesce函式第二個引數為true的實現
scala> var data = sc.parallelize(1 to 12, 3)
scala> data.collect
scala> data.partitions.size

scala> var rdd1 = data. repartition(1)
scala> rdd1.partitions.size

scala> var rdd1 = data. repartition(4)
scala> rdd1.partitions.size
res3: Int = 4
randomSplit

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

該函式根據weights權重,將一個RDD切分成多個RDD。

該權重引數為一個Double陣列

第二個引數為random的種子,基本可忽略。

scala> var rdd = sc.makeRDD(1 to 12,12)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21

scala> rdd.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23,
MapPartitionsRDD[18] at randomSplit at :23,
MapPartitionsRDD[19] at randomSplit at :23,
MapPartitionsRDD[20] at randomSplit at :23)

//這裡注意:randomSplit的結果是一個RDD陣列
scala> splitRDD.size
res8: Int = 4
//由於randomSplit的第一個引數weights中傳入的值有4個,因此,就會切分成4個RDD,
//把原來的rdd按照權重0.5, 0.1, 0.2, 0.2,隨機劃分到這4個RDD中,權重高的RDD,劃分到//的機率就大一些。
//注意,權重的總和加起來為1,否則會不正常

scala> splitRDD(0).collect
res10: Array[Int] = Array(1, 4)

scala> splitRDD(1).collect
res11: Array[Int] = Array(3)

scala> splitRDD(2).collect
res12: Array[Int] = Array(5, 9)

scala> splitRDD(3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)

glom

def glom(): RDD[Array[T]]

該函式是將RDD中每一個分割槽中型別為T的元素轉換成Array[T],這樣每一個分割槽就只有一個數組元素。

scala> var rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21
scala> rdd.partitions.size
res33: Int = 3 //該RDD有3個分割槽
scala> rdd.glom().collect
res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom將每個分割槽中的元素放到一個數組中,這樣,結果就變成了3個數組

union並集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
rdd3.collect
distinct
去重
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求並集
val rdd3 = rdd1.union(rdd2)
//去重輸出
rdd3.distinct.collect

intersection交集
val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求交集
val rdd4 = rdd1.intersection(rdd2)
rdd4.collect

subtract
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
該函式返回在RDD中出現,並且不在otherRDD中出現的元素,不去重。

val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
//求差集
val rdd4 = rdd1.subtract(rdd2)
rdd4.collect
subtractByKey

def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

subtractByKey和基本轉換操作中的subtract類似,只不過這裡是針對K的,返回在主RDD中出現,並且不在otherRDD中出現的元素。
引數numPartitions用於指定結果的分割槽數
引數partitioner用於指定分割槽函式

var rdd1 = sc.makeRDD(Array((“A”,“1”),(“B”,“2”),(“C”,“3”)),2)
var rdd2 = sc.makeRDD(Array((“A”,“a”),(“C”,“c”),(“D”,“d”)),2)
scala> rdd1.subtractByKey(rdd2).collect
res13: Array[(String, String)] = Array((B,2))

groupbyKey
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
//求並集
val rdd4 = rdd1 union rdd2
//按key進行分組
val rdd5 = rdd4.groupByKey
rdd5.collect
reduceByKey
顧名思義,reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV對。
舉例:
val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
//求並集
val rdd4 = rdd1 union rdd2
//按key進行分組
val rdd6 = rdd4.reduceByKey(_ + _)
rdd6.collect()

sortByKey
將List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,並按名稱排序

val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
//false降序
val rdd5 = rdd4.sortByKey(false)
rdd5.collect
sortBy
將List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,並按數值排序

val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + )
//false降序
val rdd5 = rdd4.sortBy(
._2, false)
rdd5.collect

zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

zip函式用於將兩個RDD組合成Key/Value形式的RDD,這裡預設兩個RDD的partition數量以及元素數量都相同,否則會丟擲異常。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq(“A”,“B”,“C”,“D”,“E”),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21

scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))

scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))

scala> var rdd3 = sc.makeRDD(Seq(“A”,“B”,“C”,“D”,“E”),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21

scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can’t zip RDDs with unequal numbers of partitions
//如果兩個RDD分割槽數不同,則丟擲異常

zipPartitions

zipPartitions函式將多個RDD按照partition組合成為新的RDD,該函式需要組合的RDD具有相同的分割槽數,但對於每個分割槽內的元素數量沒有要求。

該函式有好幾種實現,可分為三類:

引數是一個RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

這兩個區別就是引數preservesPartitioning,是否保留父RDD的partitioner分割槽資訊

對映方法f引數為兩個RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq(“A”,“B”,“C”,“D”,“E”),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21

//rdd1兩個分割槽中元素分佈:
scala> rdd1.mapPartitionsWithIndex{
| (x,iter) => {
| var result = ListString
| while(iter.hasNext){
| result ::= (“part_” + x + “|” + iter.next())
| }
| result.iterator
|
| }
| }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)

//rdd2兩個分割槽中元素分佈
scala> rdd2.mapPartitionsWithIndex{
| (x,iter) => {
| var result = ListString
| while(iter.hasNext){
| result ::= (“part_” + x + “|” + iter.next())
| }
| result.iterator
|
| }
| }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)

//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){
| (rdd1Iter,rdd2Iter) => {
| var result = ListString
| while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
| result::=(rdd1Iter.next() + “_” + rdd2Iter.next())
| }
| result.iterator
| }
| }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)

引數是兩個RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不過該函式引數為兩個RDD,對映方法f輸入引數為兩個RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21

scala> var rdd2 = sc.makeRDD(Seq(“A”,“B”,“C”,“D”,“E”),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21

scala> var rdd3 = sc.makeRDD(Seq(“a”,“b”,“c”,“d”,“e”),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21

//rdd3中個分割槽元素分佈
scala> rdd3.mapPartitionsWithIndex{
| (x,iter) => {
| var result = ListString
| while(iter.hasNext){
| result ::= (“part_” + x + “|” + iter.next())
| }
| result.iterator
|
| }
| }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)

//三個RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){
| (rdd1Iter,rdd2Iter,rdd3Iter) => {
| var result = ListString
| while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
| result::=(rdd1Iter.next() + “" + rdd2Iter.next() + "” + rdd3Iter.next())
| }
| result.iterator
| }
| }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27

scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)

引數是三個RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

用法同上面,只不過這裡又多了個一個RDD而已。

zipWithIndex

def zipWithIndex(): RDD[(T, Long)]

該函式將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。

scala> var rdd2 = sc.makeRDD(Seq(“A”,“B”,“R”,“D”,“F”),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21

scala> rdd2.zipWithIndex().collect
res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]

該函式將RDD中元素和一個唯一ID組合成鍵/值對,該唯一ID生成演算法如下:
每個分割槽中第一個元素的唯一ID值為:該分割槽索引號,
每個分割槽中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分割槽數)

看下面的例子:

scala> var rdd1 = sc.makeRDD(Seq(“A”,“B”,“C”,“D”,“E”,“F”),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
//rdd1有兩個分割槽,
scala> rdd1.zipWithUniqueId().collect
res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//總分割槽數為2
//第一個分割槽第一個元素ID為0,第二個分割槽第一個元素ID為1
//第一個分割槽第二個元素ID為0+2=2,第一個分割槽第三個元素ID為2+2=4
//第二個分割槽第二個元素ID為1+2=3,第二個分割槽第三個元素ID為3+2=5

鍵值轉換
partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

該函式根據partitioner函式生成新的ShuffleRDD,將原RDD重新分割槽。

scala> var rdd1 = sc.makeRDD(Array((1,“A”),(2,“B”),(3,“C”),(4,“D”)),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21

scala> rdd1.partitions.size
res20: Int = 2

//檢視rdd1中每個分割槽的元素
scala> rdd1.mapPartitionsWithIndex{
| (partIdx,iter) => {
| var part_map = scala.collection.mutable.MapString,List[(Int,String)]
| while(iter.hasNext){
| var part_name = “part_” + partIdx;
| var elem = iter.next()
| if(part_map.contains(part_name)) {
| var elems = part_map(part_name)
| elems ::= elem
| part_map(part_name) = elems
| } else {
| part_map(part_name) = List[(Int,String)]{elem}
| }
| }
| part_map.iterator
|
| }
| }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中

//使用partitionBy重分割槽
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23

scala> rdd2.partitions.size
res23: Int = 2

//檢視rdd2中每個分割槽的元素
scala> rdd2.mapPartitionsWithIndex{
| (partIdx,iter) => {
| var part_map = scala.collection.mutable.MapString,List[(Int,String)]
| while(iter.hasNext){
| var part_name = “part_” + partIdx;
| var elem = iter.next()
| if(part_map.contains(part_name)) {
| var elems = part_map(part_name)
| elems ::= elem
| part_map(part_name) = elems
| } else {
| part_map(part_name) = List[(Int,String)]{elem}
| }
| }
| part_map.iterator
| }
| }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中
mapValues
mapValues顧名思義就是輸入函式應用於RDD中Kev-Value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函式只適用於元素為KV對的RDD。
舉例:
scala> val a = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues(“x” + _ + “x”).collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

flatMapValues
flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為KV對的RDD中Value。每個一元素的Value被輸入函式對映為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。
舉例
val a = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
val b = a.flatMapValues(x => 1.to(x))
b.collect.foreach(println)
combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
該函式用於將RDD[K,V]轉換成RDD[K,C],這裡的V型別和C型別可以相同也可以不同。
其中的引數:
createCombiner:組合器函式,用於將V型別轉換成C型別,輸入引數為RDD[K,V]中的V,輸出為C ,分割槽內相同的key做一次
mergeValue:合併值函式,將一個C型別和一個V型別值合併成一個C型別,輸入引數為(C,V),輸出為C,分割槽內相同的key迴圈做
mergeCombiners:分割槽合併組合器函式,用於將兩個C型別值合併成一個C型別,輸入引數為(C,C),輸出為C,分割槽之間迴圈做
numPartitions:結果RDD分割槽數,預設保持原有的分割槽數
partitioner:分割槽函式,預設為HashPartitioner
mapSideCombine:是否需要在Map端進行combine操作,類似於MapReduce中的combine,預設為true

看下面例子:
scala> var rdd1 = sc.makeRDD(Array((“A”,1),(“A”,2),(“B”,1),(“B”,2),(“C”,1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21
scala> rdd1.combineByKey(
| (v : Int) => v + "",
| (c : String, v : Int) => c + “@” + v,
| (c1 : String, c2 : String) => c1 + “$” + c2
| ).collect
res60: Array[(String, String)] = Array((A,2
$1_), (B,1_$2_), (C,1_))

其中三個對映函式分別為:
createCombiner: (V) => C
(v : Int) => v + “” //在每一個V值後面加上字元,返回C型別(String)
mergeValue: (C, V) => C
(c : String, v : Int) => c + “@” + v //合併C型別和V型別,中間加字元@,返回C(String)
mergeCombiners: (C, C) => C
(c1 : String, c2 : String) => c1 + “ + c 2 / / C C ” + c2 //合併C型別和C型別,中間加 ,返回C(String)
其他引數為預設值。
最終,將RDD[String,Int]轉換為RDD[String,String]。

再看例子:

rdd1.combineByKey(
(v : Int) => List(v),
(c : List[Int], v : Int) => v :: c,
(c1 : List[Int], c2 : List[Int]) => c1 ::: c2
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
最終將RDD[String,Int]轉換為RDD[String,List[Int]]。

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

該函式用於RDD[K,V]根據K將V做摺疊、合併處理,其中的引數zeroValue表示先根據對映函式將zeroValue應用於V,進行初始化V,再將對映函式應用於初始化後的V.

例子:

scala> var rdd1 = sc.makeRDD(Array((“A”,0),(“A”,2),(“B”,1),(“B”,2),(“C”,1)))
scala> rdd1.foldByKey(0)(+).collect
res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
//將rdd1中每個key對應的V進行累加,注意zeroValue=0,需要先初始化V,對映函式為+操
//作,比如(“A”,0), (“A”,2),先將zeroValue應用於每個V,得到:(“A”,0+0), (“A”,2+0),即:
//(“A”,0), (“A”,2),再將對映函式應用於初始化後的V,最後得到(A,0+2),即(A,2)

再看:

scala> rdd1.foldByKey(2)(+).collect
res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先將zeroValue=2應用於每個V,得到:(“A”,0+2), (“A”,2+2),即:(“A”,2), (“A”,4),再將對映函
//數應用於初始化後的V,最後得到:(A,2+4),即:(A,6)

再看乘法操作:

scala> rdd1.foldByKey(0)(*).collect
res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
//先將zeroValue=0應用於每個V,注意,這次對映函式為乘法,得到:(“A”,00), (“A”,20),
//即:(“A”,0), (“A”,0),再將對映函//數應用於初始化後的V,最後得到:(A,0*0),即:(A,0)
//其他K也一樣,最終都得到了V=0

scala> rdd1.foldByKey(1)(*).collect
res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
//對映函式為乘法時,需要將zeroValue設為1,才能得到我們想要的結果。

在使用foldByKey運算元時候,要特別注意對映函式及zeroValue的取值。
reduceByKeyLocally

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

該函式將RDD[K,V]中每個K對應的V值根據對映函式來運算,運算結果對映到一個Map[K,V]中,而不是RDD[K,V]。

scala> var rdd1 = sc.makeRDD(Array((“A”,0),(“A”,2),(“B”,1),(“B”,2),(“C”,1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21

scala> rdd1.reduceByKeyLocally((x,y) => x + y)
res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)

cogroup和groupByKey的區別

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//cogroup
val rdd3 = rdd1.cogroup(rdd2)
//groupbykey
val rdd4 = rdd1.union(rdd2).groupByKey
//注意cogroup與groupByKey的區別
rdd3.foreach(println)
rdd4.foreach(println)

join

val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
//求jion
val rdd3 = rdd1.join(rdd2)
rdd3.collect

leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

leftOuterJoin類似於SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。
引數numPartitions用於指定結果的分割槽數
引數partitioner用於指定分割槽函式

var rdd1 = sc.makeRDD(Array((“A”,“1”),(“B”,“2”),(“C”,“3”)),2)
var rdd2 = sc.makeRDD(Array((“A”,“a”),(“C”,“c”),(“D”,“d”)),2)

scala> rdd1.leftOuterJoin(rdd2).collect
res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some©)))

rightOuterJoin

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]

rightOuterJoin類似於SQL中的有外關聯right outer join,返回結果以引數中的RDD為主,關聯不上的記錄為空。只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。
引數numPartitions用於指定結果的分割槽數
引數partitioner用於指定分割槽函式

var rdd1 = sc.makeRDD(Array((“A”,“1”),(“B”,“2”),(“C”,“3”)),2)
var rdd2 = sc.makeRDD(Array((“A”,“a”),(“C”,“c”),(“D”,“d”)),2)
scala> rdd1.rightOuterJoin(rdd2).collect
res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))

Action操作
first

def first(): T

first返回RDD中的第一個元素,不排序。

scala> var rdd1 = sc.makeRDD(Array((“A”,“1”),(“B”,“2”),(“C”,“3”)),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21

scala> rdd1.first
res14: (String, String) = (A,1)

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21

scala> rdd1.first
res8: Int = 10

count

def count(): Long

count返回RDD中的元素數量。

scala> var rdd1 = sc.makeRDD(Array((“A”,“1”),(“B”,“2”),(“C”,“3”)),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21

scala> rdd1.count
res15: Long = 3

reduce

def reduce(f: (T, T) ⇒ T): T

根據對映函式f,對RDD中的元素進行二元計算,返回計算結果。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

scala> rdd1.reduce(_ + _)
res18: Int = 55

scala> var rdd2 = sc.makeRDD(Array((“A”,0),(“A”,2),(“B”,1),(“B”,2),(“C”,1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21

scala> rdd2.reduce((x,y) => {
| (x._1 + y._1,x._2 + y._2)
| })
res21: (String, Int) = (CBBAA,6)

collect

def collect(): Array[T]

collect用於將一個RDD轉換成陣列。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
take

def take(num: Int): Array[T]

take用於獲取RDD中從0到num-1下標的元素,不排序。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

scala> rdd1.take(1)
res0: Array[Int] = Array(10)

scala> rdd1.take(2)
res1: Array[Int] = Array(10, 4)

top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

top函式用於從RDD中,按照預設(降序)或者指定的排序規則,返回前num個元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

scala> rdd1.top(1)
res2: Array[Int] = Array(12)

scala> rdd1.top(2)
res3: Array[Int] = Array(12, 10)

//指定排序規則
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = [email protected]

scala> rdd1.top(1)
res4: Array[Int] = Array(2)

scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)

takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

takeOrdered和top類似,只不過以和top相反的順序返回元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21

scala> rdd1.top(1)
res4: Array[Int] = Array(2)

scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)

scala> rdd1.takeOrdered(1)
res6: Array[Int] = Array(12)

scala> rdd1.takeOrdered(2)
res7: Array[Int] = Array(12, 10)

aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

aggregate使用者聚合RDD中的元素,先使用seqOp將RDD中每個分割槽中的T型別元素聚合成U型別,再使用combOp將之前每個分割槽聚合後的U型別聚合成U型別,特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的型別為U。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{
(partIdx,iter) => {
var part_map = scala.collection.mutable.MapString,List[Int]
while(iter.hasNext){
var part_name = “part_” + partIdx;
var elem = iter.next()
if(part_map.contains(part_name)) {
var elems = part_map(part_name)
elems ::= elem
part_map(part_name) = elems
} else {
part_map(part_name) = List[Int]{elem}
}
}
part_map.iterator

    }
  }.collect

res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))

##第一個分割槽中包含5,4,3,2,1

##第二個分割槽中包含10,9,8,7,6

scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a + b}
| )
res17: Int = 58

結果為什麼是58,看下面的計算過程:

##先在每個分割槽中迭代執行 (x : Int,y : Int) => x + y 並且使用zeroValue的值1

##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16

part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41

##再將兩個分割槽的結果合併(a : Int,b : Int) => a + b ,並且使用zeroValue的值1

##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

再比如:

scala> rdd1.aggregate(2)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a * b}
| )
res18: Int = 1428

##這次zeroValue=2

##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17

##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42

##最後:zeroValuepart_0part_1 = 2 * 17 * 42 = 1428

因此,zeroValue即確定了U的型別,也會對結果產生至關重要的影響,使用時候要特別注意。

fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

fold是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函式op。
var rdd1 = sc.makeRDD(1 to 10, 2)
scala> rdd1.fold(1)(
| (x,y) => x + y
| )
res19: Int = 58

##結果同上面使用aggregate的第一個例子一樣,即:
scala> rdd1.aggregate(1)(
| {(x,y) => x + y},
| {(a,b) => a + b}
| )
res20: Int = 58

lookup

def lookup(key: K): Seq[V]

lookup用於(K,V)型別的RDD,指定K值,返回RDD中該K對應的所有V值。

scala> var rdd1 = sc.makeRDD(Array((“A”,0),(“A”,2),(“B”,1),(“B”,2),(“C”,1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21

scala> rdd1.lookup(“A”)
res0: Seq[Int] = WrappedArray(0, 2)

scala> rdd1.lookup(“B”)
res1: Seq[Int] = WrappedArray(1, 2)
countByKey

def countByKey(): Map[K, Long]

countByKey用於統計RDD[K,V]中每個K的數量。

scala> var rdd1 = sc.makeRDD(Array((“A”,0),(“A”,2),(“B”,1),(“B”,2),(“B”,3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21

scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)

foreach

def foreach(f: (T) ⇒ Unit): Unit

foreach用於遍歷RDD,將函式f應用於每一個元素。
但要注意,如果對RDD執行foreach,只會在Executor端有效,而並不是Driver端。
比如:rdd.foreach(println),只會在Executor的stdout中打印出來,Driver端是看不到的。
我在Spark1.4中是這樣,不知道是否真如此。
這時候,使用accumulator共享變數與foreach結合,倒是個不錯的選擇。

scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

scala> rdd1.foreach(x => cnt += x)

scala> cnt.value
res51: Int = 55

scala> rdd1.collect.foreach(println)
foreachPartition

def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit

foreachPartition和foreach類似,只不過是對每一個分割槽使用f。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0

scala> rdd1.foreachPartition { x => {
| allsize += x.size
| }}

scala> println(allsize.value)
10

sortBy

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortBy根據給定的排序k函式將RDD中的元素進行排序。

scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)

scala> rdd1.sortBy(x => x).collect
res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //預設升序

scala> rdd1.sortBy(x => x,false).collect
res2: Array[Int] = Array(7, 6, 3, 2, 1, 0) //降序

//RDD[K,V]型別
scala>var rdd1 = sc.makeRDD(Array((“A”,2),(“A”,1),(“B”,6),(“B”,3),(“B”,7)))

scala> rdd1.sortBy(x => x).collect
res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))

//按照V進行降序排序
scala> rdd1.sortBy(x => x._2,false).collect
res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))
saveAsTextFile

def saveAsTextFile(path: String): Unit

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile用於將RDD以文字檔案的格式儲存到檔案系統中。

codec引數可以指定壓縮的類名。

var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile(“hdfs://cdh5/tmp/lxw1234.com/”) //儲存到HDFS
hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r–r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r–r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000

hadoop fs -cat /tmp/lxw1234.com/part-00000

注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)將檔案儲存到本地檔案系統,那麼只會儲存在Executor所在機器的本地目錄。

//指定壓縮格式儲存

rdd1.saveAsTextFile(“hdfs://cdh5/tmp/lxw1234.com/”,classOf[com.hadoop.compression.lzo.LzopCodec])

hadoop fs -ls /tmp/lxw1234.com
-rw-r–r-- 2 lxw1234 supergroup 0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r–r-- 2 lxw1234 supergroup 71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo

hadoop fs -text /tmp/lxw1234.com/part-00000.lzo

saveAsSequenceFile

saveAsSequenceFile用於將RDD以SequenceFile的檔案格式儲存到HDFS上。

用法同saveAsTextFile。

saveAsObjectFile

def saveAsObjectFile(path: String): Unit

saveAsObjectFile用於將RDD中的元素序列化成物件,儲存到檔案中。

對於HDFS,預設採用SequenceFile儲存。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile(“hdfs://cdh5/tmp/lxw1234.com/”)

hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT

saveAsHadoopFile

def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[_, ]], codec: Class[ <: CompressionCodec]): Unit

def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[_, ]], conf: JobConf = …, codec: Option[Class[ <: CompressionCodec]] = None): Unit

saveAsHadoopFile是將RDD儲存在HDFS上的檔案中,支援老版本Hadoop API。

可以指定outputKeyClass、outputValueClass以及壓縮格式。

每個分割槽輸出一個檔案。

var rdd1 = sc.makeRDD(Array((“A”,2),(“A”,1),(“B”,6),(“B”,3),(“B”,7)))

import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
classOf[com.hadoop.compression.lzo.LzopCodec])

saveAsHadoopDataset

def saveAsHadoopDataset(conf: JobConf): Unit

saveAsHadoopDataset用於將RDD儲存到除了HDFS的其他儲存中,比如HBase。

在JobConf中,通常需要關注或者設定五個引數:

檔案的儲存路徑、key值的class型別、value值的class型別、RDD的輸出格式(OutputFormat)、以及壓縮相關的引數。

##使用saveAsHadoopDataset將RDD儲存到HDFS中

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf

var rdd1 = sc.makeRDD(Array((“A”,2),(“A”,1),(“B”,6),(“B”,3),(“B”,7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set(“mapred.output.dir”,"/tmp/lxw1234/")
rdd1.saveAsHadoopDataset(jobConf)

結果:
hadoop fs -cat /tmp/lxw1234/part-00000
A 2
A 1
hadoop fs -cat /tmp/lxw1234/part-00001
B 6
B 3
B 7

##儲存資料到HBASE

HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

var conf = HBaseConfiguration.create()
var jobConf = new JobConf(conf)
jobConf.set(“hbase.zookeeper.quorum”,“zkNode1,zkNode2,zkNode3”)
jobConf.set(“zookeeper.znode.parent”,"/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE,“lxw1234”)
jobConf.setOutputFormat(classOf[TableOutputFormat])

var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
rdd1.map(x => 
  {
    var put = new Put(Bytes.toBytes(x._1))
    put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
    (new ImmutableBytesWritable,put)
  }
).saveAsHadoopDataset(jobConf)

##結果:
hbase(main):005:0> scan ‘lxw1234’
ROW COLUMN+CELL
A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02
B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06
C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07
3 row(s) in 0.0550 seconds

注意:儲存到HBase,執行時候需要在SPARK_CLASSPATH中加入HBase相關的jar包。

saveAsNewAPIHadoopFile

def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit

def saveAsNewAPIHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit

saveAsNewAPIHadoopFile用於將RDD資料儲存到HDFS上,使用新版本Hadoop API。

用法基本同saveAsHadoopFile。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

var rdd1 = sc.makeRDD(Array((“A”,2),(“A”,1),(“B”,6),(“B”,3),(“B”,7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

saveAsNewAPIHadoopDataset

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

作用同saveAsHadoopDataset,只不過採用新版本Hadoop API。

以寫入HBase為例:

HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

完整的Spark應用程式:

package com.lxw1234.test

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put

object Test {
def main(args : Array[String]) {
val sparkConf = new SparkConf().setMaster(“spark://lxw1234.com:7077”).setAppName(“lxw1234.com”)
val sc = new SparkContext(sparkConf);
var rdd1 = sc.makeRDD(Array((“A”,2),(“B”,6),(“C”,7)))

sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
var job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

rdd1.map(
  x => {
    var put = new Put(Bytes.toBytes(x._1))
    put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
    (new ImmutableBytesWritable,put)
  }    
).saveAsNewAPIHadoopDataset(job.getConfiguration)

sc.stop()   

}
}

注意:儲存到HBase,執行時候需要在SPARK_CLASSPATH中加入HBase相關的jar包。