Spark RDD
Spark RDD的簡介
什麼是RDD?
RDD是整個Spark的基石,是一個彈性分散式的資料集,為使用者遮蔽了底層複雜的計算和對映操作。
RDD的特點:
- RDD 是不可變的,如果對一個RDD進行轉換操作會生成一個新的RDD。
- RDD 是分割槽的,RDD 裡面的具體資料是分佈在多臺機器上的 Excutor裡面的。
- RDD 是彈性的。
RDD 的彈性特徵:
- 彈性:Spark會根據使用者的配置或者當前Spark的應用執行情況去自動將RDD的資料快取到記憶體或者磁碟。它是一個對使用者不可見的封裝的功能。
- 容錯:當你的RDD資料被刪除或者丟失的時候,可以通過血統或者檢查點機制恢復資料,這個對使用者來說也是透明的。
- 計算:RDD的計算是分層的,有 應用->job->Stage->TaskSet->Task ,每一層都有相對應的計算的保障與重複機制。保障你的計算不會由於一些突然的因素而發生終止。
- 分片:你可以根據業務需求或者一些運算元來重新調整RDD的資料分佈情況。
如何建立RDD
從外部檔案建立
從外部檔案建立RDD可以使用textFile()
方法。
** textFile()方法原始碼:**
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. * @param path path to the text file on a supported file system * @param minPartitions suggested minimum number of partitions for the resulting RDD * @return RDD of lines of the text file */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
textFile(path: String,minPartitions: Int = defaultMinPartitions)有兩個引數:
-
第一個引數是檔案的路徑
-
第二個引數是構建RDD後的分割槽數量,可以跟你傳入的引數將資料劃分成多個分割槽。
如果你不指定分割槽的數量,首先會獲取你的Task數量即defaultParallelism,然後再和2比較。如果你的Task數量大於或等於2,那麼預設會將你的資料集分成兩個區。如果你的Task數量是1,那麼預設會將你的資料分成一個區。
defaultMinPartitions方法,比較你的Task數量和2的大小:
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
defaultParallelism方法,獲取你的Task數量:
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
示例:
從本地外部檔案建立RDD:
scala> sc.textFile("file:///usr/hdp/3.1.0.0-78/spark2/README.md")
res0: org.apache.spark.rdd.RDD[String] = file:///usr/hdp/3.1.0.0-78/spark2/README.md MapPartitionsRDD[1] at textFile at <console>:25
從hdfs檔案系統中讀取檔案建立RDD:
scala> sc.textFile("hdfs:///README.md")
res2: org.apache.spark.rdd.RDD[String] = hdfs:///README.md MapPartitionsRDD[3] at textFile at <console>:25
讀取本地檔案時,需要在檔案路徑前面加上file://
。從hdfs上讀取檔案的時候,需要在檔案路徑上加上hdfs://
。如果你不在檔案路徑前面加上檔案系統協議,會預設到hdfs檔案系統中讀取檔案。
從scala變數轉換
從scala變數轉換成RDD有兩個方法,一個是parallelize()
,另一個是makeRDD()
parallelize()
parallelize() 原始碼:
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)同樣有兩個引數:
-
第一個引數是scala的變數。這個變數是可以是Seq下面的所有子類。
-
第二個引數是指定建立後RDD的分割槽數量,如果不指定也是預設在Task數量和2中選一個最小的值作為分割槽數量。
不指定分割槽示例:
//從Seq建立RDD
scala> sc.parallelize(Seq(1 to 5))
res3: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[4] at parallelize at <console>:25
scala> res3.collect
res4: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))
//從Array建立RDD
scala> sc.parallelize(Array(1 to 5))
res5: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[5] at parallelize at <console>:25
scala> res5.collect
res6: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))
//從List建立RDD
scala> sc.parallelize(List(1 to 5))
res7: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at parallelize at <console>:25
scala> res7.collect
res8: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))
指定分割槽示例:
//我的Task是1,所以預設的分割槽數量是1
scala> sc.parallelize(Seq(1 to 5))
res9: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[7] at parallelize at <console>:25
scala> res9.partitions.size
res10: Int = 1
//指定資料存放到10個分割槽
scala> sc.parallelize(Seq(1 to 5),10)
res11: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[8] at parallelize at <console>:25
scala> res11.partitions.size
res12: Int = 10
makeRDD()
makeRDD()有兩個過載的方法:
** 第一個過載方法原始碼:**
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
* @param seq Scala collection to distribute
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed collection
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
第一個過載方法 makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)
實際和parallelize()方法引數一樣,在方法體裡面實際上也是呼叫parallelize(),這裡就不展開講了。
** 第二個過載方法原始碼: **
/**
* Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item.
* @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
* @return RDD representing data partitioned according to location preferences
*/
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
}
第二個過載方法makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])])
可以指定RDD的資料存放在哪個分割槽上,傳入的引數是一個Seq。
Seq裡面的每個一元素都是一個元組,元組的第一個元素就是分割槽號,第二個元素是每個分割槽裡面的資料。
示例:
//建立3個分割槽的資料
scala> val mrdd=sc.makeRDD(List((1,List("a","b","c")),(2,List("b","c","d")),(3,List("c","d","e"))))
mrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
//檢視分割槽的數量
scala> mrdd.partitions.size
res1: Int = 3
//獲取第一個分割槽的資料
scala> mrdd.preferredLocations(mrdd.partitions(0))
res2: Seq[String] = List(a, b, c)
從其他RDD轉換
RDD的轉換操作到時候會單獨寫一篇,敬請期待~