1. 程式人生 > 實用技巧 >Spark RDD

Spark RDD

Spark RDD的簡介

什麼是RDD?

RDD是整個Spark的基石,是一個彈性分散式的資料集,為使用者遮蔽了底層複雜的計算和對映操作。

RDD的特點:

  • RDD 是不可變的,如果對一個RDD進行轉換操作會生成一個新的RDD。
  • RDD 是分割槽的,RDD 裡面的具體資料是分佈在多臺機器上的 Excutor裡面的。
  • RDD 是彈性的。

RDD 的彈性特徵:

  • 彈性:Spark會根據使用者的配置或者當前Spark的應用執行情況去自動將RDD的資料快取到記憶體或者磁碟。它是一個對使用者不可見的封裝的功能。
  • 容錯:當你的RDD資料被刪除或者丟失的時候,可以通過血統或者檢查點機制恢復資料,這個對使用者來說也是透明的。
  • 計算:RDD的計算是分層的,有 應用->job->Stage->TaskSet->Task ,每一層都有相對應的計算的保障與重複機制。保障你的計算不會由於一些突然的因素而發生終止。
  • 分片:你可以根據業務需求或者一些運算元來重新調整RDD的資料分佈情況。

如何建立RDD

從外部檔案建立

從外部檔案建立RDD可以使用textFile()方法。

** textFile()方法原始碼:**

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   * @param path path to the text file on a supported file system
   * @param minPartitions suggested minimum number of partitions for the resulting RDD
   * @return RDD of lines of the text file
   */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

textFile(path: String,minPartitions: Int = defaultMinPartitions)有兩個引數:

  • 第一個引數是檔案的路徑

  • 第二個引數是構建RDD後的分割槽數量,可以跟你傳入的引數將資料劃分成多個分割槽。

如果你不指定分割槽的數量,首先會獲取你的Task數量即defaultParallelism,然後再和2比較。如果你的Task數量大於或等於2,那麼預設會將你的資料集分成兩個區。如果你的Task數量是1,那麼預設會將你的資料分成一個區。

defaultMinPartitions方法,比較你的Task數量和2的大小:

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

defaultParallelism方法,獲取你的Task數量:

  /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
  def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

示例:

從本地外部檔案建立RDD:

scala> sc.textFile("file:///usr/hdp/3.1.0.0-78/spark2/README.md")
res0: org.apache.spark.rdd.RDD[String] = file:///usr/hdp/3.1.0.0-78/spark2/README.md MapPartitionsRDD[1] at textFile at <console>:25

從hdfs檔案系統中讀取檔案建立RDD:

scala> sc.textFile("hdfs:///README.md")
res2: org.apache.spark.rdd.RDD[String] = hdfs:///README.md MapPartitionsRDD[3] at textFile at <console>:25

讀取本地檔案時,需要在檔案路徑前面加上file://。從hdfs上讀取檔案的時候,需要在檔案路徑上加上hdfs://。如果你不在檔案路徑前面加上檔案系統協議,會預設到hdfs檔案系統中讀取檔案。

從scala變數轉換

從scala變數轉換成RDD有兩個方法,一個是parallelize(),另一個是makeRDD()

parallelize()

parallelize() 原始碼:

def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)同樣有兩個引數:

  • 第一個引數是scala的變數。這個變數是可以是Seq下面的所有子類。

  • 第二個引數是指定建立後RDD的分割槽數量,如果不指定也是預設在Task數量和2中選一個最小的值作為分割槽數量。

不指定分割槽示例:

//從Seq建立RDD
scala> sc.parallelize(Seq(1 to 5))
res3: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[4] at parallelize at <console>:25

scala> res3.collect
res4: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))

//從Array建立RDD
scala> sc.parallelize(Array(1 to 5))
res5: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[5] at parallelize at <console>:25

scala> res5.collect
res6: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))

//從List建立RDD
scala> sc.parallelize(List(1 to 5))
res7: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at parallelize at <console>:25

scala> res7.collect
res8: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))

指定分割槽示例:

//我的Task是1,所以預設的分割槽數量是1        
scala> sc.parallelize(Seq(1 to 5))
res9: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[7] at parallelize at <console>:25
                                                                                                       
scala> res9.partitions.size
res10: Int = 1

//指定資料存放到10個分割槽
scala> sc.parallelize(Seq(1 to 5),10)
res11: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[8] at parallelize at <console>:25

scala> res11.partitions.size
res12: Int = 10

makeRDD()

makeRDD()有兩個過載的方法:

** 第一個過載方法原始碼:**

  /** Distribute a local Scala collection to form an RDD.
   *
   * This method is identical to `parallelize`.
   * @param seq Scala collection to distribute
   * @param numSlices number of partitions to divide the collection into
   * @return RDD representing distributed collection
   */
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

第一個過載方法 makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)

實際和parallelize()方法引數一樣,在方法體裡面實際上也是呼叫parallelize(),這裡就不展開講了。

** 第二個過載方法原始碼: **

  /**
   * Distribute a local Scala collection to form an RDD, with one or more
   * location preferences (hostnames of Spark nodes) for each object.
   * Create a new partition for each collection item.
   * @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
   * @return RDD representing data partitioned according to location preferences
   */
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
    assertNotStopped()
    val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
    new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
  }

第二個過載方法makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])])

可以指定RDD的資料存放在哪個分割槽上,傳入的引數是一個Seq。

Seq裡面的每個一元素都是一個元組,元組的第一個元素就是分割槽號,第二個元素是每個分割槽裡面的資料。

示例:

//建立3個分割槽的資料
scala> val mrdd=sc.makeRDD(List((1,List("a","b","c")),(2,List("b","c","d")),(3,List("c","d","e"))))
mrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

//檢視分割槽的數量
scala> mrdd.partitions.size
res1: Int = 3

//獲取第一個分割槽的資料
scala> mrdd.preferredLocations(mrdd.partitions(0))
res2: Seq[String] = List(a, b, c)

從其他RDD轉換

RDD的轉換操作到時候會單獨寫一篇,敬請期待~