1. 程式人生 > >Spark運算元講解

Spark運算元講解

1:Zip運算元

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

將兩個RDD做zip操作,如果當兩個RDD分割槽數目不一樣的話或每一個分割槽數目不一樣的話則會異常。

例如:

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2)
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6),3)
rdd.zip(rdd1).collect

異常資訊:

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 3)
  at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$
2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

例如:

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2)
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7),2)
rdd.zip(rdd1).collect

異常資訊:

Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

2:zipPartitions

以分割槽為單位進行zip操作,要求分割槽數目相等。否則異常。

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6),2)
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7),2)
val func = (x:Iterator[Int], y:Iterator[Int])=>x.toSeq.++(y.toSeq).toIterator
rdd1.zipPartitions(rdd2)(func).collect

3:zipWithIndex

給RDD中的每一個元素新增上索引號,組成二元組。索引號從0開始並且索引號型別是Long,當RDD分割槽大於1個時候需要出發一個Spark Job。

4:zipWithUniqueId

複製程式碼
var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
//rdd1有兩個分割槽,
rdd1.zipWithUniqueId().collect
 Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
//總分割槽數為2
//第一個分割槽第一個元素ID為0,第二個分割槽第一個元素ID為1
//第一個分割槽第二個元素ID為0+2=2,第一個分割槽第三個元素ID為2+2=4
//第二個分割槽第二個元素ID為1+2=3,第二個分割槽第三個元素ID為3+2=5
複製程式碼

其實就是按照每一個的分割槽的每一個元素的順序進行編號。這個運算元不需要出發作業到叢集執行。

5:union

RDD求並集操作,不會自動去重。

複製程式碼
res31: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd2.collect
collect   collectAsync

scala> rdd2.collect
res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

scala> rdd1.union(rdd2).collect
collect   collectAsync

scala> rdd1.union(rdd2).collect
res34: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7)//不去重
複製程式碼

6:distinct

scala> unionRDD.collect
res38: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7)
scala> unionRDD.distinct.collect
res39: Array[Int] = Array(4, 1, 5, 6, 2, 3, 7)

實現去重。

7:treeReduce

treeReduce有點類似於reduce函式,也不需要傳入初始值,只不過這個運算元使用一個多層樹的形式來進行reduce操作。

scala> rdd1.collect
res42: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.treeReduce((x,y)=>x+y)
res43: Int = 21

8:aggregate

複製程式碼
scala> rdd1.collect
res53: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.partitions.length
res54: Int = 2

scala> rdd1.aggregate(1)((x,y)=>x+y,(x,y)=>x+y)
res56: Int = 24

scala> rdd1.repartition(3).aggregate(1)((x,y)=>x+y,(x,y)=>x+y)
res57: Int = 25
複製程式碼

我們設定的聚集函式的ZeroValue值是1,這個值會每一個分割槽聚集時候使用一次,最後在聚集所有分割槽時候在使用一次。

我們這裡面分區內部元素計算函式是:

(x,y)=>x+y

分割槽之間的聚集函式:

(x,y)=>x+y

由於rdd1預設是2個分割槽,所以在計算兩個分割槽時候使用兩次,相當於+1,最後合併兩個分割槽時候有使用一次,相當於再加1.所以一共加3,,即:

1+2+3+4+5+6=21,21+3 =24.另一個只因為多一個分割槽,所以多累加1.

9:treeAggregate

和8中聚集運算元效果一樣,只不過使用的是樹的層次結構聚集。

10:top

返回前面n個最大元素,可以定義排序規則

11:takeSample

def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T]

隨機取樣,抽取num個樣例。可以指定是否重複抽取,隨機數種子是一個生成隨機數的初始條件,可以使用系統時間戳作為種子值。

當不允許重複抽取時候,num數目大於rdd元素數目不會報錯,此時只會抽取rdd的所有元素。

12:takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

抽取出num個個最小的元素,唯一和top區別就是top抽取大的,takeOrdered抽取小的。

13:take

def take(num: Int): Array[T]

返回num個數據,一般當資料較大的時候如果collect操作會導致Driver記憶體溢位,所以此時可以使用take攜帶少量資料到Driver。

14:subtract

def subtract(other: RDD[T]): RDD[T]

返回一個在當前RDD中且不在other中的元素所生成的RDD

15:sortBy

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

例如:

複製程式碼
scala> rdd1.collect
res19: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd2 = rdd1.map(x=>(x,scala.util.Random.nextInt(100),scala.util.Random.nextInt(100)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[33] at map at <console>:26

scala> rdd2.collect
collect   collectAsync

scala> rdd2.collect
res20: Array[(Int, Int, Int)] = Array((1,87,34), (2,5,62), (3,51,60), (4,72,33), (5,33,23))

scala> res13.sortBy(x=>x._3).collect
res21: Array[(Int, Int, Int)] = Array((3,26,12), (4,45,28), (1,12,37), (2,71,67), (5,80,96))
複製程式碼

16:sample

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

隨機取樣,是否重複取樣,抽取資料的百分比例。

17:repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

重新建立一個只有numPartitions個分割槽的RDD,提高分割槽數或降低分割槽數會改變並行度,內部實現實現需要shuffle。如果需要降低RDD的分割槽數的話儘可能使用coalesce運算元,它會避免shuffle的發生。

18:coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

降低原來RDD的分割槽數目到numPartitions個分割槽。例如由1000個分割槽降到100個分割槽的話,這樣是一個窄依賴,因此不需要shuffle過程。

但是如果RDD原本有2個分割槽的話,當我們呼叫coalesce(5)的話,生成的RDD分割槽還將是2,不會增加,但是如果呼叫coalesce(1)的話,則會生成分割槽個數為1的RDD。(coalesce只會減少分割槽數,不會增加分割槽數)。

拓展:如果我們的RDD分割槽數為1的話,我們可以傳遞shuffle=true,當計算時候會進行shuflle分佈到多個節點進行計算。

19:checkpoint

def checkpoint(): Unit

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint directory set with SparkContext#setCheckpointDir and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

20:cartesian

def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

兩個RDD生成笛卡爾積。

複製程式碼
scala> rdd1.collect
res37: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd2.collect
res38: Array[Int] = Array(6, 7, 8, 9, 10)

scala> rdd1.cartesian(rdd2).collect
res39: Array[(Int, Int)] = Array((1,6), (1,7), (2,6), (2,7), (1,8), (1,9), (1,10), (2,8), (2,9), (2,10), (3,6), (3,7), (4,6), (4,7), (5,6), (5,7), (3,8), (3,9), (3,10), (4,8), (4,9), (4,10), (5,8), (5,9), (5,10))
複製程式碼

21:cache

def cache(): RDD.this.type

將RDD快取,快取級別為:MEMORY_ONLY

22:persist

def persist(): RDD.this.type

將RDD快取,快取級別為:MEMORY_ONLY

23:persist

def persist(newLevel: StorageLevel): RDD.this.type

指定快取級別,在第一次被計算之後進行快取。

24:keyBy

def keyBy[K](f: (T) ⇒ K): RDD[(K, T)]

根據函式f進行選取key,例如:

scala> rdd1.collect
res43: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd1.keyBy(x=>x*x).collect
res44: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5))

25:intersection

def intersection(other: RDD[T]): RDD[T]

求兩個RDD的交集

1:glom

def glom(): RDD[Array[T]]

將原RDD的元素收集到一個數組,建立一個數組型別的RDD

2:getNumPartitions

final def getNumPartitions: Int

求RDD的分割槽書

3:groupBy

def groupBy[K](f: (T) ⇒ K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

根據指定函式進行分組,例如:

scala> rdd1.collect
res61: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd1.groupBy(x=>if(x%2==0) 0 else 1).collect
res62: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(4, 2)), (1,CompactBuffer(1, 3, 5)))

4:randomSplit

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

將一個RDD根據weights陣列進行劃分多個RDD,返回一個數組。

5:countByValue

返回每一個元素出現的次數,可以更加方便實現wordcount

scala> sc.parallelize(Array(1,2,1,2,1,2,3,4,5)).countByValue
res73: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 3, 2 -> 3, 3 -> 1, 4 -> 1)

6:countByValueApprox

def countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null): PartialResult[Map[T, BoundedDouble]]

求一個近似的計算結果

7:++

def ++(other: RDD[T]): RDD[T]

求RDD的並集

8:fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

例如:

scala> rdd1.collect
res90: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd1.fold(0)(_+_)
res91: Int = 15