1. 程式人生 > >Spark 算子

Spark 算子

off 數據 toa 內部實現 vbs literal 目錄 part 行動

RDD算子分類,大致可以分為兩類,即:

  1. Transformation:轉換算子,這類轉換並不觸發提交作業,完成作業中間過程處理。

  2. Action:行動算子,這類算子會觸發SparkContext提交Job作業。

一:Transformation:轉換算子

1.map
map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

舉例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

上述例子中把原RDD中每個元素都乘以2來產生一個新的RDD。

2.mapPartitions
mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每個元素,而mapPartitions的輸入函數是應用於每個分區,也就是把每個分區中的內容作為整體來處理的。
它的函數定義為:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
f即為輸入函數,它處理每個分區裏面的內容。每個分區中的內容將以Iterator[T]傳遞給輸入函數f,f的輸出結果是Iterator[U]。最終的RDD由所有分區經過輸入函數處理後的結果合並起來的。

舉例:

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at :27

scala> a.collect
res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)                            

scala> var c = a.mapPartitions( a=>a.filter(_>=7) )
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at mapPartitions at :29

scala> c.collect
res12: Array[Int] = Array(7, 8, 9) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

上述例子是通過函數filter對分區中所有數據進行過濾
mapPartitions還有些變種,比如mapPartitionsWithContext,它能把處理過程中的一些狀態信息傳遞給用戶指定的輸入函數。還有mapPartitionsWithIndex,它能把分區的index傳遞給用戶指定的輸入函數。

3.mapValues
mapValues顧名思義就是輸入函數應用於RDD中Kev-Value的Value,原RDD中的Key保持不變,與新的Value一起組成新的RDD中的元素。因此,該函數只適用於元素為KV對的RDD。

舉例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))
  • 1
  • 2
  • 3
  • 4

4.mapWith
mapWith是map的另外一個變種,map只需要一個輸入函數,而mapWith有兩個輸入函數。它的定義如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
第一個函數constructA是把RDD的partition index(index從0開始)作為輸入,輸出為新類型A;
第二個函數f是把二元組(T, A)作為輸入(其中T為原RDD中的元素,A為第一個函數的輸出),輸出類型為U。
舉例:把partition index 乘以10,然後加上2作為新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) 
x.mapWith(a => a * 10)((a, b) => (b + 2)).collect 
res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)
  • 1
  • 2
  • 3

5.flatMap
與map類似,區別是原RDD中的元素經map處理後只能生成一個元素,而原RDD中的元素經flatmap處理後可生成多個元素來構建新RDD。
舉例:對原RDD中的每個元素x產生y個元素(從1到y,y為元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)
  • 1
  • 2
  • 3
  • 4

6.flatMapWith
flatMapWith與mapWith很類似,都是接收兩個函數,一個函數把partitionIndex作為輸入,輸出是一個新類型A;另外一個函數是以二元組(T,A)作為輸入,輸出為一個序列,這些序列裏面的元素組成了新的RDD。它的定義如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]
舉例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)
  • 1
  • 2
  • 3
  • 4

7.flatMapValues
flatMapValues類似於mapValues,不同的在於flatMapValues應用於元素為KV對的RDD中Value。每個一元素的Value被輸入函數映射為一系列的值,然後這些值再與原RDD中的Key組成一系列新的KV對。

舉例

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val b = a.flatMapValues(x=>x.to(5))
scala> b.collect
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))
  • 1
  • 2
  • 3
  • 4

上述例子中原RDD中每個元素的值被轉換為一個序列(從其當前值到5),比如第一個KV對(1,2), 其值2被轉換為2,3,4,5。然後其再與原KV對中Key組成一系列新的KV對(1,2),(1,3),(1,4),(1,5)。

8.reduce
reduce將RDD中元素兩兩傳遞給輸入函數,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函數直到最後只有一個值為止。

舉例

scala> val c = sc.parallelize(1 to 10)
scala> c.reduce((x, y) => x + y)
res4: Int = 55
  • 1
  • 2
  • 3

上述例子對RDD中的元素求和。

9.reduceByKey
顧名思義,reduceByKey就是對元素為KV對的RDD中Key相同的元素的Value進行reduce,因此,Key相同的多個元素的值被reduce為一個值,然後與原RDD中的Key組成一個新的KV對。

舉例:

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> a.reduceByKey((x,y) => x + y).collect
res7: Array[(Int, Int)] = Array((1,2), (3,10))
  • 1
  • 2
  • 3

上述例子中,對Key相同的元素的值求和,因此Key為3的兩個元素被轉為了(3,10)。

10.cartesian:

對兩個RDD內的所有元素進行笛卡爾積操作(耗內存),內部實現返回CartesianRDD。

scala> val a = sc.parallelize(List(1,2,3))

scala> val b = sc.parallelize(List(4,5,6))

scala> val c = a.cartesian(b)

scala> c.collect
res15: Array[(Int, Int)] = Array((1,4), (1,5), (1,6), (2,4), (3,4), (2,5), (2,6), (3,5), (3,6))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

11 Sample:
sample將RDD這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有有放回的抽樣,百分比,隨機種子,進而決定采樣方式。

內部實現: SampledRDD(withReplacement,fraction,seed)。
函數參數設置:
withReplacement=true,表示有放回的抽樣。
withReplacement=false,表示無放回的抽樣。

根據fraction指定的比例,對數據進行采樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。

隨機函數產生的是一種偽隨機數,它實際是一種序列發生器,有固定的算法,只有當種子不同時,序列才不同,所以不應該把種子固定在程序中,應該用隨機產生的數做種子,如程序運行時的時間等。
以c++為例,應先用srand()設置不同種子,否則每次調用rand()得到的值是一樣的。

scala> val a = sc.parallelize(1 to 100,3)

scala> a.sample(false,0.1,0).count
res16: Long = 12

scala> a.sample(false,0.1,0).collect
res17: Array[Int] = Array(10, 47, 55, 73, 76, 84, 87, 88, 91, 92, 95, 98)

scala> a.sample(true,0.7,scala.util.Random.nextInt(10000)).count
res19: Long = 75

scala> a.sample(true,0.7,scala.util.Random.nextInt(10000)).collect
res20: Array[Int] = Array(1, 3, 3, 3, 5, 6, 9, 9, 9, 9, 10, 10, 15, 17, 20, 23, 23, 27, 28, 31, 32, 32, 34, 35, 36, 36, 36, 36, 38, 39, 41, 42, 42, 43, 45, 47, 49, 49, 50, 50, 51, 51, 54, 55, 55, 57, 57, 57, 57, 57, 59, 59, 61, 61, 63, 67, 72, 74, 76, 76, 80, 80, 81, 81, 81, 82, 83, 85, 87, 88, 90, 93, 95, 96, 97, 97, 99, 100)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

12 union:

使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合並的RDD元素數據類型相同。並不進行去重操作,保存所有的元素,如果想去重,可以使用distinct()。同時,spark還提供更為簡潔的使用union的API,即通過++符號相當於union函數操作。

eg: a 與 b 的聯合

scala> val a = sc.parallelize(List(("A",1),("B",2),("c",3),("A",4),("C",5) ))

scala> val b = sc.parallelize(List(("A",5),("B",6),("A",4),("C",9) ))

scala> a.union(b).collect
res22: Array[(String, Int)] = Array((A,1), (B,2), (c,3), (A,4), (C,5), (A,5), (B,6), (A,4), (C,9))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

去重復:

scala> val d = sc.parallelize(List(("A",5),("B",6),("A",5) ))

scala> d.distinct.collect
res25: Array[(String, Int)] = Array((B,6), (A,5))
  • 1
  • 2
  • 3
  • 4

13 groupByKey:

將元素通過函數生成相應的Key,數據就轉化為Key-Value格式,之後將Key相同的元素分為一組。

eg:根據數據集中的每個元素的K值對數據分組

scala> val a = sc.parallelize(List(("A",1),("B",2),("c",3),("A",4),("C",5) ))

scala> a.groupByKey().collect
res21: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2)), (A,CompactBuffer(1, 4)), (C,CompactBuffer(5)), (c,CompactBuffer(3)))
  • 1
  • 2
  • 3
  • 4

14 join:

join對兩個需要連接的RDD進行cogroup函數操作,將相同key的數據能偶放到一個分區,在cgroup操作之後形成新RDD對每個key下的元素進行笛卡爾積的操作,返回的結果在展平,對應key下的所有元組形成一個集合。最後返回 RDD[(K, (V, W))]。

eg:a與b兩個數據連接,相當於表的關聯

scala> val a = sc.parallelize(List(("A",1),("B",2),("c",3),("A",4),("C",5) ))

scala> val b = sc.parallelize(List(("A",5),("B",6),("A",4),("C",9) ))

scala> a.join(b).collect
res23: Array[(String, (Int, Int))] = Array((B,(2,6)), (A,(1,5)), (A,(1,4)), (A,(4,5)), (A,(4,4)), (C,(5,9)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

15 cache:

cache將RDD元素從磁盤緩存到內存。相當於 persist(MEMORY_ONLY) 函數的
功能。

16 persist:

persist函數對RDD進行緩存操作,數據緩存在哪裏,由StorageLevel這個枚舉類型進行確定。DISK 代表磁盤,MEMORY 代表內存, SER 代表數據是否進行序列化存儲。

函數定義: persist(newLevel:StorageLevel)
StorageLevel 是枚舉類型,代表存儲模式。

MEMORY_AND_DISK_SER 代表數據可以存儲在內存和磁盤,並且以序列化的方式存儲,其他同理。


二:Action:行動算子

1.foreach:
foreach對RDD中的每個元素都應用f函數操作,不返回 RDD 和 Array, 而是返回Uint。

2.saveAsTextFile:

函數將數據輸出,存儲到 HDFS 的指定目錄。
函數的內 部實現,其內部通過調用 saveAsHadoopFile 進行實現:

this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFileTextOutputFormat[NullWritable, Text]

將 RDD 中的每個元素映射轉變為 (null, x.toString),然後再將其寫入 HDFS。

3 collect:

collect相當於toArray,不過已經過時不推薦使用,collect將分布式的RDD返回為一個單機的scala Array數據,在這個數組上運用 scala 的函數式操作。

4.count:

count返回整個RDD的元素個數。

scala> val a = sc.parallelize(1 to 10 )

scala> a.collect
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)                         

scala> a.count
res10: Long = 10

Spark 算子