Spark Core源代碼分析: RDD基礎
RDD
RDD初始參數:上下文和一組依賴
abstract class RDD[T: ClassTag]( @transient private var sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable
下面須要細致理清:
A list of Partitions
Function to compute split (sub RDD impl)
A list of Dependencies
Partitioner for K-V RDDs (Optional)
Preferred locations to compute each spliton (Optional)
Dependency
Dependency代表了RDD之間的依賴關系。即血緣
RDD中的使用
RDD給子類提供了getDependencies方法來制定怎樣依賴父類RDD
protected def getDependencies: Seq[Dependency[_]] = deps
其實,在獲取first parent的時候,子類常常會使用以下這種方法
protected[spark] def firstParent[U: ClassTag] = { dependencies.head.rdd.asInstanceOf[RDD[U]] }
能夠看到,Seq裏的第一個dependency應該是直接的parent,從而從第一個dependency類裏獲得了rdd。這個rdd就是父RDD。
一般的RDD子類都會這麽實現compute和getPartition方法,以SchemaRDD舉例:
override def compute(split: Partition, context: TaskContext): Iterator[Row] = firstParent[Row].compute(split, context).map(_.copy()) override def getPartitions: Array[Partition] = firstParent[Row].partitions
compute()方法調用了第一個父類的compute,把結果RDD copy返回
getPartitions返回的就是第一個父類的partitions
以下看一下Dependency類及其子類的實現。
寬依賴和窄依賴
abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
Dependency裏傳入的rdd。就是父RDD本身。
繼承結構例如以下:
NarrowDependency代表窄依賴。即父RDD的分區,最多被子RDD的一個分區使用。所以支持並行計算。
子類須要實現方法:
def getParents(partitionId: Int): Seq[Int]
OneToOneDependency表示父RDD和子RDD的分區依賴是一對一的。
RangeDependency表示在一個range範圍內,依賴關系是一對一的,所以初始化的時候會有一個範圍。範圍外的partitionId,傳進去之後返回的是Nil。
以下介紹寬依賴。
class ShuffleDependency[K, V]( @transient rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = null) extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) { // 上下文增量定義的Id val shuffleId: Int = rdd.context.newShuffleId() // ContextCleaner的作用和實如今SparkContext章節敘述 rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) }
寬依賴針對的RDD是KV形式的。須要一個partitioner指定分區方式(下一節介紹)。須要一個序列化工具類,序列化工具眼下的實現例如以下:
寬依賴和窄依賴對失敗恢復時候的recompute有不同程度的影響,寬依賴可能是要所有計算的。
Partition
Partition詳細表示RDD每一個數據分區。
Partition提供trait類,內含一個index和hashCode()方法,詳細子類實現與RDD子類有關。種類例如以下:
在分析每一個RDD子類的時候再涉及。
Partitioner
Partitioner決定KV形式的RDD怎樣依據key進行partition
abstract class Partitioner extends Serializable { def numPartitions: Int // 總分區數 def getPartition(key: Any): Int }
在ShuffleDependency裏相應一個Partitioner,來完畢寬依賴下。子RDD怎樣獲取父RDD。
默認Partitioner
Partitioner的伴生對象提供defaultPartitioner方法,邏輯為:
傳入的RDD(至少兩個)中,遍歷(順序是partition數目從大到小)RDD,假設已經有Partitioner了,就使用。
假設RDD們都沒有Partitioner,則使用默認的HashPartitioner。
而HashPartitioner的初始化partition數目,取決於是否設置了spark.default.parallelism,假設沒有的話就取RDD中partition數目最大的值。
假設上面這段文字看起來費解。代碼例如以下:
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse for (r <- bySize if r.partitioner.isDefined) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { new HashPartitioner(bySize.head.partitions.size) } }
HashPartitioner
HashPartitioner基於java的Object.hashCode。會有個問題是Java的Array有自己的hashCode,不基於Array裏的內容,所以RDD[Array[_]]或RDD[(Array[_], _)]使用HashPartitioner會有問題。
顧名思義。getPartition方法實現例如以下
def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) }
RangePartitioner
RangePartitioner處理的KV RDD要求Key是可排序的,即滿足Scala的Ordered[K]類型。所以它的構造例如以下:
class RangePartitioner[K <% Ordered[K]: ClassTag, V]( partitions: Int, @transient rdd: RDD[_ <: Product2[K,V]], private val ascending: Boolean = true) extends Partitioner {
內部會計算一個rangBounds(上界),在getPartition的時候。假設rangBoundssize小於1000,則逐個遍歷獲得;否則二分查找獲得partitionId。
Persist
默認cache()過程是將RDD persist在內存裏,persist()操作能夠為RDD又一次指定StorageLevel。
class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1)
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon
RDD的persist()和unpersist()操作,都是由SparkContext運行的(SparkContext的persistRDD和unpersistRDD方法)。
Persist過程是把該RDD存在上下文的TimeStampedWeakValueHashMap裏維護起來。也就是說,事實上persist並非action。並不會觸發不論什麽計算。
Unpersist步驟例如以下。會交給SparkEnv裏的BlockManager處理。
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) { env.blockManager.master.removeRdd(rddId, blocking) persistentRdds.remove(rddId) listenerBus.post(SparkListenerUnpersistRDD(rddId)) }
Checkpoint
RDD Actions api裏提供了checkpoint()方法,會把本RDD save到SparkContext CheckpointDir
文件夾下。建議該RDD已經persist在內存中,否則須要recomputation。
假設該RDD沒有被checkpoint過,則會生成新的RDDCheckpointData。
RDDCheckpointData類與一個RDD關聯,記錄了checkpoint相關的信息,而且記錄checkpointRDD的一個狀態,
[ Initialized --> marked for checkpointing--> checkpointing in progress --> checkpointed ]
內部有一個doCheckpoint()方法(會被以下調用)。
運行邏輯
真正的checkpoint觸發,在RDD私有方法doCheckpoint()裏。doCheckpoint()會被DAGScheduler調用。且是在此次job裏使用這個RDD完成之後,此時這個RDD就已經被計算或者物化過了。能夠看到。會對RDD的父RDD進行遞歸。
private[spark] def doCheckpoint() { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { checkpointData.get.doCheckpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) } } }
RDDCheckpointData的doCheckpoint()方法關鍵代碼例如以下:
// Create the output path for the checkpoint val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) val fs = path.getFileSystem(rdd.context.hadoopConfiguration) if (!fs.mkdirs(path)) { throw new SparkException("Failed to create checkpoint path " + path) } // Save to file, and reload it as an RDD val broadcastedConf = rdd.context.broadcast( new SerializableWritable(rdd.context.hadoopConfiguration)) // 這次runJob終於調的是dagScheduler的runJob rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _) // 此時rdd已經記錄到磁盤上 val newRDD = new CheckpointRDD[T](rdd.context, path.toString) if (newRDD.partitions.size != rdd.partitions.size) { throw new SparkException("xxx") }
runJob終於調的是dagScheduler的runJob。
做完後。生成一個CheckpointRDD。
詳細CheckpointRDD相關內容能夠參考其它章節。
API
子類須要實現的方法
// 計算某個分區 def compute(split: Partition, context: TaskContext): Iterator[T] protected def getPartitions: Array[Partition] // 依賴的父RDD,默認就是返回整個dependency序列 protected def getDependencies: Seq[Dependency[_]] = deps protected def getPreferredLocations(split: Partition): Seq[String] = Nil
Transformations
略。
Actions
略。
SubRDDs
部分RDD子類的實現分析,包含下面幾個部分:
1) 子類本身構造參數
2) 子類的特殊私有變量
3) 子類的Partitioner實現
4) 子類的父類函數實現
def compute(split: Partition, context: TaskContext): Iterator[T] protected def getPartitions: Array[Partition] protected def getDependencies: Seq[Dependency[_]] = deps protected def getPreferredLocations(split: Partition): Seq[String] = Nil
CheckpointRDD
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil)
CheckpointRDDPartition繼承自Partition。沒有什麽添加。
有一個被廣播的hadoop conf變量,在compute方法裏使用(readFromFile的時候用)
val broadcastedConf = sc.broadcast( new SerializableWritable(sc.hadoopConfiguration))
getPartitions: Array[Partition]方法:
依據checkpointPath去查看Path下有多少個partitionFile,File個數為partition數目。
getPartitions方法返回的Array[Partition]內容為New CheckpointRDDPartition(i),i為[0, 1, …, partitionNum]
getPreferredLocations(split:Partition): Seq[String]方法:
文件位置信息,借助hadoop core包。獲得block location,把得到的結果依照host打散(flatMap)並過濾掉localhost,返回。
compute(split: Partition, context:TaskContext): Iterator[T]方法:
調用CheckpointRDD.readFromFile(file, broadcastedConf,context)方法,當中file為hadoopfile path,conf為廣播過的hadoop conf。
Hadoop文件讀寫及序列化
伴生對象提供writeToFile方法和readFromFile方法。主要用於讀寫hadoop文件,而且利用env下的serializer進行序列化和反序列化工作。兩個方法詳細實現例如以下:
def writeToFile[T]( path: String, broadcastedConf: Broadcast[SerializableWritable[Configuration]], blockSize: Int = -1 )(ctx: TaskContext, iterator: Iterator[T]) {
創建hadoop文件的時候會若存在會拋異常。把hadoop的outputStream放入serializer的stream裏。serializeStream.writeAll(iterator)寫入。
writeToFile的調用在RDDCheckpointData類的doCheckpoint方法裏。例如以下:
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
def readFromFile[T]( path: Path, broadcastedConf: Broadcast[SerializableWritable[Configuration]], context: TaskContext ): Iterator[T] = {
打開Hadoop的inutStream,讀取的時候使用env下的serializer得到反序列化之後的流。返回的時候,DeserializationStream這個trait提供了asIterator方法,每次next操作能夠進行一次readObject。
在返回之前,調用了TaskContext提供的addOnCompleteCallback回調。用於關閉hadoop的inputStream。
NewHadoopRDD
class NewHadoopRDD[K, V]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient conf: Configuration) extends RDD[(K, V)](sc, Nil) with SparkHadoopMapReduceUtil
private[spark] class NewHadoopPartition( rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) extends Partition { val serializableHadoopSplit = new SerializableWritable(rawSplit) override def hashCode(): Int = 41 * (41 + rddId) + index }
getPartitions操作:
依據inputFormatClass和conf,通過hadoop InputFormat實現類的getSplits(JobContext)方法得到InputSplits。(ORCFile在此處的優化)
這樣獲得的split同RDD的partition直接相應。
compute操作:
針對本次split(partition),調用InputFormat的createRecordReader(split)方法,
得到RecordReader<K,V>。這個RecordReader包裝在Iterator[(K,V)]類內,復寫Iterator的next()和hasNext方法,讓compute返回的InterruptibleIterator[(K,V)]可以被叠代獲得RecordReader取到的數據。
getPreferredLocations(split: Partition)操作:
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
在NewHadoopPartition裏SerializableWritable將split序列化,然後調用InputSplit本身的getLocations接口,得到有數據分布節點的nodes name列表。
WholeTextFileRDD
NewHadoopRDD的子類
private[spark] class WholeTextFileRDD( sc : SparkContext, inputFormatClass: Class[_ <: WholeTextFileInputFormat], keyClass: Class[String], valueClass: Class[String], @transient conf: Configuration, minSplits: Int) extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
復寫了getPartitions方法:
NewHadoopRDD有自己的inputFormat實現類和recordReader實現類。在spark/input package下專門寫了這兩個類的實現。感覺是種參考。
InputFormat
WholeTextFileRDD在spark裏實現了自己的inputFormat。
讀取的File以K,V的結構獲取。K為path。V為整個file的content。
復寫createRecordReader以使用WholeTextFileRecordReader
復寫setMaxSplitSize方法,因為用戶能夠傳入minSplits數目,計算平均大小(splits files總大小除以split數目)的時候就變了。
RecordReader
復寫nextKeyValue方法。會讀出指定path下的file的內容,生成new Text()給value,結果是String。假設文件正在被別的進行打開著,會返回false。否則把file內容讀進value裏。
使用場景
在SparkContext下提供wholeTextFile方法,
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)]
用於讀取一個路徑下的全部text文件,以K。V的形式返回,K為一個文件的path,V為文件內容。比較適合小文件。
全文完 :)
Spark Core源代碼分析: RDD基礎