Spark 延時計算原理
阿新 • • 發佈:2018-11-17
Spark 延時計算原理
Spark運算元主要分為兩類:
- Transformation變換/轉換運算元:這種變換是延遲計算的,也就是說從一個RDD轉換生成另一個RDD的轉換操作不是馬上執行,需要等到有Action操作的時候才會真正觸發運算。
- Action行動運算元:這類運算元會觸發Spark提交作業(Job),並將資料輸出。
Spark是延時計算的,只有Action運算元才會觸發任務的正式執行,那麼Spark是如何實現延時計算的,要理解延時計算原理,需要搞懂以下三個問題:
- 如何暫存計算邏輯?
- 如何進行邏輯分發?
- 如何還原計算邏輯?
一、如何暫存計算邏輯?
以WordCount
程式碼為例:
val file = sc.textFile("...")
val wordCounts = file
.flatMap(line => line.split(","))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordCounts.saveAsTextFile("...")
sc.textFile()
方法並沒有立即進行檔案的讀取,而只是返回了一個RDD的子類HadoopRDD。HadoopRDD在繼承RDD類時,前置依賴是Nil,因為這是讀取輸入的任務,沒有父任務依賴很正常。def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) } def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions) .map(pair => pair._2.toString).setName(path) } class HadoopRDD[K, V](...) extends RDD[(K, V)](sc, Nil) with Logging
flatMap
返回的也是RDD的子類MapPartitionsRDD,並且在建立MapPartitionsRDD時將當前RDD的引用this傳入給了建構函式中的prev變數,可以發現在這裡也沒有進行計算邏輯,而是儲存了一個RDD的血緣依賴鏈。def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) } private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev)
map
和flapMap
的邏輯一致,也是返回MapPartitionsRDD。def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
reduceByKey
也只是返回RDD的子類MapPartitionsRDD或者ShuffledRDD。另外,也可以注意到ShuffledRDD的前置依賴也傳入的是Nil值。def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) } def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { ... if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } } def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) } class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( @transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner) extends RDD[(K, C)](prev.context, Nil)
saveAsTextFile
會呼叫SparkContext.runJob方法,從而觸發實際的任務計算。def saveAsTextFile(path: String): Unit = withScope { ... RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { ... saveAsHadoopDataset(hadoopConf) } def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { ... self.context.runJob(self, writeToFile) writer.commitJob()
- 檢視各Transformation和Action運算元,可以發現所有的Transformation都是返回RDD的子類物件,而只有Action運算元才會觸發runJob方法從而生成實際的任務。所以Spark中的RDD其實是一個RDD的依賴序列,每個RDD都儲存有上游依賴RDD的引用,計算邏輯是以RDD血緣依賴的方式暫存。
二、如何進行邏輯分發?
Spark進行分散式計算的一個策略是移動計算而不是移動資料,如果要移動計算就需要能將計算邏輯序列化與反序列化後進行網路傳輸。前面分析得知,Spark是將計算邏輯以RDD血緣序列的方式暫存,而RDD是以partition為單位將資料分佈在不同的節點,對每一個Transformation運算元來說其引數都是一個函式,因此需要能將RDD的依賴及Transformation運算元對應的函式進行分發。。
Spark正是利用Scala的閉包進行邏輯分發的。所謂閉包,就是能夠捕獲處於函式外部而又在被建立作用域之內變數的函式。在Scala中,在函式內引用外部函式或類的變數或方法,將形成一個閉包。當我們傳遞innerScope函式的時候變數factor也將與innerScope作為一個整體被傳遞,這個整體就是所謂的閉包。事實上Scala編譯器會為每個閉包(函式)生成一個可序列化類。
Action運算元會呼叫SparkContext.runJob方法進行任務提交,我們來看一下任務提交的過程也即邏輯分發的過程是怎麼樣的:
- 首先會呼叫DAGScheduler.runJob和submitJob方法,然後交由DAGScheduler.handleJobSubmitted方法進行處理。該方法會先完成Stage的劃分,然後進行最後一個Stage的提交操作。如果有父Stage未提交,submitStage方法以遞迴的方式先提交父Stage,並將當前Stage放入waitingStages佇列。
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } ... submitStage(finalStage) } /** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) }
- 然後呼叫DAGScheduler.submitMissingTasks方法提交stage中的task任務。首先會找到需要計算的分割槽,然後根據stage的型別建立儲存RDD的Broadcast,將該資訊廣播到所有的Executor上。stage分為ShuffleMapStage和ResultStage,一個job中有且僅有一個ResultTask用於資料輸出,所以它只需要序列化RDD分割槽資料資訊和相應的輸出函式。另一方面,我們知道stage是按資料shuffle進行劃分的,所有對於非輸出的stage,實際就讀取上一個stage的shuffle輸入,所以需要序列化RDD分割槽資訊以及各個分割槽的依賴上游RDD的分割槽資訊。接著,依據需要計算的RDD每一個分割槽partitionsToCompute建立ShuffleMapTask或ResultTask任務陣列,並利用TaskSchedulerImpl.submitTasks進行任務提交。
/** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() ... var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } taskBinary = sc.broadcast(taskBinaryBytes) } ... val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } ... if (tasks.size > 0) { taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) submitWaitingChildStages(stage) } }
- 最後將封裝好的Task任務序列傳送到相應的Executor去執行。TaskSchedulerImpl.submitTasks首先建立TaskSetManager物件來封裝TaskSet,然後將TaskSetManager物件放入排程池(Pool),將任務傳送給相應的Executor進行排程執行。
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }
三、如何還原計算邏輯?
當Task任務分發到Executor後,各個Executor是如何進行資料計算的,即如何還原計算邏輯的:
- 首先Executor收到要執行的任務後,呼叫Executor.launchTask將任務放入執行緒池等待執行。
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }
- 接著TaskRunner將任務反序列化後,恢復task相關的引數設定,並呼叫task.run方法執行。task為ShuffleMapTask或ResultTask,當task執行完成後,將序列化結果值value,最後呼叫CoarseGrainedExecutorBackend.statusUpdate將結果告知Driver。
override def run(): Unit = { ... val ser = env.closureSerializer.newInstance() try { // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set(taskDescription.properties) updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) task = ser.deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) env.mapOutputTracker.updateEpoch(task.epoch) val value = try { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } ... val resultSer = env.serializer.newInstance() val valueBytes = resultSer.serialize(value) // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { ... serializedDirectResult } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } finally { runningTasks.remove(taskId) } }
- 然後Task.run會呼叫ShuffleMapTask或ResultTask的runTask方法,從而呼叫該階段末RDD的iterator方法,獲取該RDD某個分割槽內的資料記錄。對於ShuffleMapTask其是將該RDD某個分割槽的資料利用ShuffleWriter輸出,而對於ResultTask則是利用相應的輸出函式進行輸出。其中iterator方法呼叫呼叫RDD類的compute方法獲取相應分割槽的資料是Spark延時計算的關鍵,即如何根據RDD的血緣依賴得到當前分割槽的資料。
final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T = { ... runTask(context) ... } // ShuffleMapTask.scala override def runTask(context: TaskContext): MapStatus = { ... var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } ... } // ResultTask.scala override def runTask(context: TaskContext): U = { ... func(context, rdd.iterator(partition, context)) } final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } } private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } } def compute(split: Partition, context: TaskContext): Iterator[T]
- 最後是RDD分割槽資料的計算獲取。RDD抽象類要求其所有子類都必須實現compute方法,該方法的引數之一是一個Partition物件,目的是計算該分割槽中的資料。換句話說,compute函式負責的是父RDD分割槽資料到子RDD分割槽資料的變換邏輯。可以發現,HadoopRDD中實現的邏輯是讀取檔案記錄,MapPartitionsRDD中是將父RDD.iterator返回的分割槽資料作用於Transformation運算元定義的處理函式f上,ShuffledRDD則是利用shuffleHandle讀取上一stage的shuffle輸出資料。
// HadoopRDD.scala override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { ... private var reader: RecordReader[K, V] = null reader = try { inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) } ... new InterruptibleIterator[(K, V)](context, iter) } // MapPartitionsRDD.scala override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) // ShuffledRDD.scala override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
四 小結
Spark利用RDD血緣依賴序列的方式來暫存計算邏輯,通過閉包及序列化的方式實現邏輯的分發,通過RDD子類對compute方法的不同實現來讀取依賴資料並進行處理,從而實現Spark的延時分散式計算。