1. 程式人生 > >spark中makerdd和parallelize的區別

spark中makerdd和parallelize的區別

我們知道,在Spark中建立RDD的建立方式大概可以分為三種:(1)、從集合中建立RDD;(2)、從外部儲存建立RDD;(3)、從其他RDD建立。

  而從集合中建立RDD,Spark主要提供了兩中函式:parallelize和makeRDD。我們可以先看看這兩個函式的宣告:

Spark提供了兩種建立RDD的方式:讀取外部資料集,以及在驅動器程式中對一個集合進行並行化。

在驅動器程式中對一個集合進行並行化的方式有兩種:parallelize()和makeRDD()。

1、parallelize()

def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

2、makeRDD(),有兩種重構方法,如下:

2.1、方法一:

/** Distribute a local Scala collection to form an RDD.
   *
   * This method is identical to `parallelize`.
   */
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

 可以發現,該重構方法的實現就是呼叫parallelize()方法。

2.2、方法二:

/**
   * Distribute a local Scala collection to form an RDD, with one or more
   * location preferences (hostnames of Spark nodes) for each object.
   * Create a new partition for each collection item.
   */
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
    assertNotStopped()
    val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
    new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
  }

註釋的意思為:分配一個本地Scala集合形成一個RDD,為每個集合物件建立一個最佳分割槽。

給出如下例子,可以更清晰的看到它們之間的區別:

首先定義集合物件:

val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))

使用parallelize()建立RDD:

val rdd1 = sc.parallelize(seq)

查詢rdd1的分割槽數:

rdd1.partitions.size  // 2

使用makeRDD()建立RDD

val rdd2 = sc.makeRDD(seq)

檢視rdd2的分割槽數

rdd2.partitions.size  // 3

總之:

第一種makerdd與parallerize兩者完全一致,傳遞的都是集合的形式;其實第一種makerdd實現是依賴了parallelize函式

第二種makerdd還提供了計算位置。