1. 程式人生 > >Spark函式講解:coalesce

Spark函式講解:coalesce

對RDD中的分割槽重新進行合併。

函式原型

def coalesce(numPartitions: Int, shuffle: Boolean = false)
    (implicit ord: Ordering[T] = null): RDD[T]
返回一個新的RDD,且該RDD的分割槽個數等於numPartitions個數。如果shuffle設定為true,則會進行shuffle。

例項

scala> var data = sc.parallelize(List(1,2,3,4))
data: org.apache.spark.rdd.RDD[Int] = 
    ParallelCollectionRDD[45] at parallelize at <console>:12

scala> data.partitions.length
res68: Int = 30

scala> val result = data.coalesce(2, false)
result: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[57] at coalesce at <console>:14

scala> result.partitions.length
res77: Int = 2

scala> result.toDebugString
res75: String = 
(2) CoalescedRDD[57] at coalesce at <console>:14 []
 |  ParallelCollectionRDD[45] at parallelize at <console>:12 []

scala> val result1 = data.coalesce(2, true)
result1: org.apache.spark.rdd.RDD[Int] = MappedRDD[61] at coalesce at <console>:14

scala> result1.toDebugString
res76: String = 
(2) MappedRDD[61] at coalesce at <console>:14 []
 |  CoalescedRDD[60] at coalesce at <console>:14 []
 |  ShuffledRDD[59] at coalesce at <console>:14 []
 +-(30) MapPartitionsRDD[58] at coalesce at <console>:14 []
    |   ParallelCollectionRDD[45] at parallelize at <console>:12 []
從上面可以看出shuffle為false的時候並不進行shuffle操作;而為true的時候會進行shuffle操作。RDD.partitions.length可以獲取相關RDD的分割槽數。