1. 程式人生 > >通過例子學習spark rdd--Transformations函式

通過例子學習spark rdd--Transformations函式

通過例子學習spark rdd

Transformations函式

所有的Transformations函式完成後會返回一個新的RDD。
在講解例子的時候測試的資料如下:

$ hadoop fs -cat /user/zxh/pdata/pdata
3350,province_name,上海,5.0
3349,province_name,四川,4.0
3348,province_name,湖南,11.0
3348,province_name,河北,11.0

map

  • 功能
    在RDD的每個item上使用transformation函式,結果返回一個新的RDD。

  • 函式原型

    def
map[U:
ClassTag](f: T => U): RDD[U]
  • 使用例子
// 構建一個rdd
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"))
// 通過map計算rdd每個成員的長度
scala> val b = a.map(_.length)
// 列印rdd
scala> b.collect().foreach(println)

在spark-shell中執行以上程式,得到的結果如下:

3
6
6
3
8

檢視一下得到的rdd的型別

scala> b
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:26

可以看出該rdd的型別是MapPartitionsRDD,該RDD是通過在父RDD上通過map運算而得到的。

注意:collect()函式會把所有的資料匯聚到本地,然後打印出來,若資料量太大,最好避免使用該函式。

floatMap

  • 功能
    類似於map,與map相比flatMap有兩個很大的區別:
    (1) flatMap允許在map函式的基礎上擴充套件成多個成員。
    (2) flatMap會將一個長度為N的RDD轉換成一個N個元素的集合,然後再把這N個元素合成到一個單個RDD的結果集。

  • 函式原型

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 
  • 使用例子
scala> val a = sc.parallelize(1 to 10)
// 擴充套件a的元素,把a的每個元素擴充套件成flatMap中的元素
scala> val b = a.flatMap(1 to _)
// 得到最後的c的結果,可以看到把a的每個元素都擴充套件成1 到 該元素 的多個值。
scala> val c = b.collect()
c: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

flatMap 和 map

  1. flatMap會把N個元素的集合合成一個單一的RDD結果集

通過例子來檢視這個兩個的區別把:

  • flatMap的例子
val lines = sc.textFile("/user/zxh/pdata/pdata")
val rs1 = lines.flatMap(line=>line.split(","))
rs1.collect()

flatMap的輸出如下:

res5: Array[String] = Array(3350, province_name, 上海, 5.0, 3349, province_name, 四川, 4.0, 3348, province_name, 湖南, 11.0, 3348, province_name, 河北, 11.0)
  • map的輸出
val lines = sc.textFile("/user/zxh/pdata/pdata")
val rs2 = lines.map(line => line.split(","))
rs2.collect()

map的輸出如下:

res6: Array[Array[String]] = Array(Array(3350, province_name, 上海, 5.0), Array(3349, province_name, 四川, 4.0), Array(3348, province_name, 湖南, 11.0), Array(3348, province_name, 河北, 11.0))
  • 小結
    可以看到flatMap把最後的結果都合併到一個RDD的集合中了,而map是在每個item上輸出是什麼就保留什麼元素,不合合併到一個集合中。

mapPartitions

  • 功能
    這是一個特殊的map,它在每個分割槽中只調用一次。通過輸入引數(Iterarator [T]),各個分割槽的全部內容都可以作為值的順序流。
    自定義函式必須返回另一個Iterator [U]。組合的結果迭代器會自動轉換為新的RDD。
    請思考,結果的元組中,為什麼沒有:(3,4)和(6,7)

  • 函式原型

def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]
  • 使用舉例
scala> val a = sc.parallelize(1 to 10, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
     |   var res = List[(T, T)]()
     |   var pre = iter.next
     |   while (iter.hasNext)
     |   {
     |     val cur = iter.next;
     |     res .::= (pre, cur)
     |     pre = cur;
     |   }
     |   res.iterator
     | }
myfunc: [T](iter: Iterator[T])Iterator[(T, T)]

scala> a.mapPartitions(myfunc).collect
res19: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (9,10), (8,9), (7,8))
  • 程式說明
    請注意最後的結果,為什麼沒有(3,4)和(6,7),我們需要進行以下探索和思考:
    思考:由於mapPartitions是在每個分割槽中執行的,所以我們先看看每個分割槽的資料:
// 通過glom函式把每個分割槽的資料聚合起來,方便進行檢視
scala> a.glom().collect()
res27: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

得到每個分割槽的資料後,在對每個分割槽的資料進行myfunc函式的處理。可以看出,3和4不在同一個分割槽,6和7也不在同一個分割槽。
所以,在每個分割槽的處理時,不可能組合在一起,這就是為什麼不會有(3,4)和(6,7)的原因。

filter

  • 函式原型
def filter(f: T => Boolean): RDD[T]
  • 功能
    在RDD物件物件上使用filter函式,並返回滿足條件的新的RDD。

  • 使用例子

scala> val a = sc.parallelize(1 to 10)
scala> val b = a.filter(_>2)
scala> b.collect()
    res3: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9, 10)
  • 例子2
val lines = sc.textFile("/user/zxh/pdata/pdata")
val r = lines.flatMap(line=>line.split(",")).filter(_.length>5)
scala> r.collect()
res9: Array[String] = Array(province_name, province_name, province_name, province_name)

// 按元素長度進行過濾
scala> val r = lines.flatMap(line=>line.split(",")).filter(_.length>3)
r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[33] at filter at <console>:26
scala> r.collect()
res11: Array[String] = Array(3350, province_name, 3349, province_name, 3348, province_name, 11.0, 3348, province_name, 11.0)

// 在每個元素中使用函式contains進行過濾
scala> val r = lines.flatMap(line=>line.split(",")).filter(_.contains("33"))
r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[39] at filter at <console>:26
scala> r.collect()
res14: Array[String] = Array(3350, 3349, 3348, 3348)

distinct

  • 函式原型
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

注意:distinct也可以指定分割槽數,若沒有指定,使用原來的分割槽數。

  • 功能
    對RDD的元素去重。
    返回一個包含每個唯一值一次的新RDD。

  • 使用例子

scala> val r = lines.flatMap(line=>line.split(",")).filter(_.length>3)
r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[59] at filter at <console>:26

// 這裡輸出的是有重複的元素
scala> r.collect()
res20: Array[String] = Array(3350, province_name, 3349, province_name, 3348, province_name, 11.0, 3348, province_name, 11.0)

// 呼叫distinct()後這輸出的是去重後的元素
scala> r.distinct().collect()
res21: Array[String] = Array(province_name, 3348, 11.0, 3350, 3349)

repartition

  • 函式原型
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
  • 說明

    • 返回一個有確定分割槽數:numPartitions的RDD。在該RDD中可能會增加或減少並行度的水平。
    • 在實現內部,該函式會開啟一個shuffle過程來重新分配資料,若你減少RDD的分割槽,可以通過coalesce函式來避免進行shuffle。
    • repartition(numPartitions)只是coalesce(numPartitions,shuffle = true)的縮寫。
  • 使用例子

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length
res16: Int = 2

// 注意distinct也可以指定分割槽
a.distinct(3).partitions.length
res17: Int = 3

val c2 = a.repartition(3)
c2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[73] at repartition at <console>:26
c2.partitions.length
res27: Int = 3

sample

  • 函式原型
  def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]
* withReplacement:是否要替換
* fraction:原來的RDD元素大小的百分比
* seed:隨機數產生器的seed
  • 功能
    隨機選擇RDD專案的一部分資料,並將其返回到新的RDD中。

  • 使用例子

val a = sc.parallelize(1 to 10000, 3)
scala> a.sample(false,0.1,0).count()
res4: Long = 1032

scala> a.sample(false,0.3,0).count()
res5: Long = 2997

scala> a.sample(false,0.2,0).count()
res6: Long = 2018

union,++

  • 函式原型
def union(other: RDD[T]): RDD[T] 
  • 功能
    執行標準集合操作:A聯合B。
    若元素有重複,會保留重複的元素。

  • 使用例子

scala> val a = sc.parallelize(1 to 5)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> val b = sc.parallelize(6 to 9)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> a.union(b).collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> (a++b).collect()
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> (a ++ b).collect()
res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

sortBy

  • 功能說明
    該函式對輸入RDD的資料進行排序並將其儲存在新的RDD中。
    第一個引數: 需要您指定一個將輸入資料對映到要sortBy的鍵的函式。
    第二個引數:(可選)指定是否要按升序或降序對資料進行排序。
    返回通過給定key函式生成的key。

  • 函式原型

def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 
  • 使用例子
scala> val a = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

// 按值倒序排序
scala> a.sortBy(c=>c, false).collect()
res1: Array[Int] = Array(7, 5, 3, 2, 1, 1)

// 按值遞增排序(預設值)
scala> a.sortBy(c=>c, true).collect()
res5: Array[Int] = Array(1, 1, 2, 3, 5, 7)

// 按每個值除以4,得到的結果排序
scala> a.sortBy(c=>c/4, true).collect()
res4: Array[Int] = Array(2, 1, 1, 3, 5, 7)
  • 例子2
// 按RDD中的集合元素的第一個元素排序
scala> val x = sc.parallelize(Array(("abc", 10), ("def", 25), ("acb", 31), ("dfe", 59)))
x: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> x.sortBy(c => c._1, true).collect
res9: Array[(String, Int)] = Array((abc,10), (acb,31), (def,25), (dfe,59))

sortByKey

  • 功能說明
    該函式對輸入RDD的資料進行排序並將其儲存在新的RDD中。
    輸出RDD是一個shuffled RDD,因為它的資料是被shuffled的reducer輸出。
    這個功能的實現其實很聰明。首先,它使用範圍分割槽器對混洗RDD內的範圍內的資料進行分割槽。然後使用標準的排序機制,使用mapPartition單獨對這些範圍進行排序。

  • 函式原型

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
  • 使用例子
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)

c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))

c.sortByKey(false).collect
res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))

intersection

  • 功能說明
    返回兩個RDD的交集,輸出的元素將會去重。
    注意:該函式在內部會進行shuffle的過程。

  • 函式原型

def intersection(other: RDD[T]): RDD[T] 
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
  • 例子
scala> a.collect()
res1: Array[Int] = Array(5, 7, 1, 3, 2, 1)

scala> val b = sc.parallelize(Array(6,7,8,9,2,1))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> val c = a.intersection(b)
c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at intersection at <console>:28

scala> c.collect()
res4: Array[Int] = Array(1, 2, 7)

glom

  • 功能
    建立一個新的RDD,該RDD把會將各個分割槽的所有元素合併到同一個陣列中,若有多個分割槽,就會得到一個有多個數組的集合。

  • 函式原型

def glom(): RDD[Array[T]]
  • 程式碼實現
    該函式的程式碼實現很簡單,就是建立了一個MapPartitionsRDD,把每個分割槽的元素分別放到同一個陣列中。
 /**
   * Return an RDD created by coalescing all elements within each partition into an array.
   */
  def glom(): RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }
  • 使用例子
    如何理解?可以通過例子來說明。
scala> val a = sc.parallelize(1 to 10, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> a.collect()
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// 檢視一下分割槽的個數
scala> a.partitions.length
res2: Int = 3

// 呼叫了glom合併分割槽的資料
scala> val b = a.glom()
b: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[3] at glom at <console>:26

// 每個分割槽的資料組成了一個array
scala> b.collect()
res4: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

zip

  • 功能說明
    將兩個分割槽中的第n個分割槽相互組合,從而連線兩個RDD。 生成的RDD將由兩部分元組組成,這些元組被解釋為鍵-值(key-value)對。
    注意:使用該函式時,兩個RDD的分割槽和元素個數必須一樣,否則將會報錯。見例子2。

  • 函式原型

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  • 使用例子
scala> val a = sc.parallelize(1 to 10, 3)
scala> val b = sc.parallelize(11 to 20, 3)
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[9] at zip at <console>:28
scala> c.collect()
res8: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20))
  • 例子2
scala> val a = sc.parallelize(1 to 10, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val b = sc.parallelize(1 to 12, 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[12] at zip at <console>:28

scala> c.collect()
17/11/26 10:36:54 WARN TaskSetManager: Lost task 1.0 in stage 5.0 (TID 16, sz-pg-entuo-dev-024.tendcloud.com, executor 2): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
...

zipParititions

  • 功能
    和zip的功能相似,但可以提供更多的控制。

  • 函式原型

def zipPartitions[B: ClassTag, V: ClassTag]
      (rdd2: RDD[B], preservesPartitioning: Boolean)
      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, V: ClassTag]
      (rdd2: RDD[B])
      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] 

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C])
      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] 

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]

def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]
  • 使用例子
val a = sc.parallelize(0 to 9, 3)
val b = sc.parallelize(10 to 19, 3)
val c = sc.parallelize(100 to 109, 3)
def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] =
{
  var res = List[String]()
  while (aiter.hasNext && biter.hasNext && citer.hasNext)
  {
    val x = aiter.next + " " + biter.next + " " + citer.next
    res ::= x
  }
  res.iterator
}
a.zipPartitions(b, c)(myfunc).collect
res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)

zipWithIndex

  • 功能:
    使用元素索引來壓縮RDD的元素。索引從0開始。如果RDD分佈在多個分割槽上,則啟動一個Spark作業來執行此操作。

  • 函式原型

def zipWithIndex(): RDD[(T, Long)]
  • 使用例子
// 字串的例子
scala> val r1 = sc.parallelize(Array("a", "b", "c", "d"))
r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at parallelize at <console>:24

scala> val r2 = r1.zipWithIndex
r2: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[14] at zipWithIndex at <console>:26

scala> r2.collect()
res10: Array[(String, Long)] = Array((a,0), (b,1), (c,2), (d,3))


// 整數型別的例子
scala> val z = sc.parallelize(1 to 10, 5)
z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> val r2 = z.zipWithIndex
r2: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[18] at zipWithIndex at <console>:26

scala> r2.collect()
res11: Array[(Int, Long)] = Array((1,0), (2,1), (3,2), (4,3), (5,4), (6,5), (7,6), (8,7), (9,8), (10,9))

scala> r2.partitions.length
res12: Int = 5