Spark coalesce 和repartitions 區別
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]
Return a new RDD that is reduced into
numPartitions
partitions.This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions is requested, it will stay at the current number of partitions.
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
譯文:
返回一個經過簡化到numPartitions個分割槽的新RDD。這會導致一個窄依賴,例如:你將1000個分割槽轉換成100個分割槽,這個過程不會發生shuffle,相反如果10個分割槽轉換成100個分割槽將會發生shuffle。然而如果你想大幅度合併分割槽,例如合併成一個分割槽,這會導致你的計算在少數幾個叢集節點上計算(言外之意:並行度不夠)。為了避免這種情況,你可以將第二個shuffle引數傳遞一個true,這樣會在重新分割槽過程中多一步shuffle,這意味著上游的分割槽可以並行執行。
注意:
第二個引數shuffle=true,將會產生多於之前的分割槽數目,例如你有一個個數較少的分割槽,假如是100,呼叫coalesce(1000, shuffle = true)將會使用一個 HashPartitioner產生1000個分割槽分佈在叢集節點上。這個(對於提高並行度)是非常有用的。
def repartition(numPartitions: Int)(implicit ord: Ordering[(K, V)] = null): RDD[(K, V)]
Return a new RDD that has exactly numPartitions partitions.
Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.
If you are decreasing the number of partitions in this RDD, consider using
coalesce
, which can avoid performing a shuffle.TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
譯文:
返回一個恰好有numPartitions個分割槽的RDD,可以增加或者減少此RDD的並行度。內部,這將使用shuffle重新分佈資料,如果你減少分割槽數,考慮使用coalesce,這樣可以避免執行shuffle
coalesce與repartition:重分割槽 (*)都是重分割槽 (*)區別:coalesce 預設不會進行shuffle(false) repartition 就會進行shuffle (*)舉例: val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2) 檢視分割槽個數:rdd1.partitions.length 重新分割槽: val rdd2 = rdd1.repartition(3) val rdd3 = rdd1.coalesce(3,false) ---> 分割槽數:2 val rdd4 = rdd1.coalesce(3,true) ---> 分割槽數:3