1. 程式人生 > >Spark coalesce 和repartitions 區別

Spark coalesce 和repartitions 區別

 原始碼包: org.apache.spark.rdd

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]

Return a new RDD that is reduced into numPartitions partitions.

This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

譯文:

        返回一個經過簡化到numPartitions個分割槽的新RDD。這會導致一個窄依賴,例如:你將1000個分割槽轉換成100個分割槽,這個過程不會發生shuffle,相反如果10個分割槽轉換成100個分割槽將會發生shuffle。然而如果你想大幅度合併分割槽,例如合併成一個分割槽,這會導致你的計算在少數幾個叢集節點上計算(言外之意:並行度不夠)。為了避免這種情況,你可以將第二個shuffle引數傳遞一個true,這樣會在重新分割槽過程中多一步shuffle,這意味著上游的分割槽可以並行執行。

注意:

第二個引數shuffle=true,將會產生多於之前的分割槽數目,例如你有一個個數較少的分割槽,假如是100,呼叫coalesce(1000, shuffle = true)將會使用一個  HashPartitioner產生1000個分割槽分佈在叢集節點上。這個(對於提高並行度)是非常有用的。

def repartition(numPartitions: Int)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.

If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.

譯文:

        返回一個恰好有numPartitions個分割槽的RDD,可以增加或者減少此RDD的並行度。內部,這將使用shuffle重新分佈資料,如果你減少分割槽數,考慮使用coalesce,這樣可以避免執行shuffle

coalescerepartition:重分割槽     (*)都是重分割槽     (*)區別:coalesce 預設不會進行shuffle(false)                        repartition 就會進行shuffle                         (*)舉例:              val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)              檢視分割槽個數:rdd1.partitions.length              重新分割槽: val rdd2 = rdd1.repartition(3)                        val rdd3 = rdd1.coalesce(3,false)  --->  分割槽數:2                        val rdd4 = rdd1.coalesce(3,true)   --->  分割槽數:3