阿新 • • 發佈:2018-11-17
Spark 延時計算原理
- Transformation變換/轉換運算元:這種變換是延遲計算的,也就是說從一個RDD轉換生成另一個RDD的轉換操作不是馬上執行,需要等到有Action操作的時候才會真正觸發運算。
- Action行動運算元:這類運算元會觸發Spark提交作業(Job),並將資料輸出。
- 如何暫存計算邏輯?
- 如何進行邏輯分發?
- 如何還原計算邏輯?
val file = sc.textFile("...")
val wordCounts = file
.flatMap(line => line.split(","))
.map(word => (word, 1))
.reduceByKey(_ + _)
方法並沒有立即進行檔案的讀取,而只是返回了一個RDD的子類HadoopRDD。HadoopRDD在繼承RDD類時,前置依賴是Nil,因為這是讀取輸入的任務,沒有父任務依賴很正常。def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) } def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions) .map(pair => pair._2.toString).setName(path) } class HadoopRDD[K, V](...) extends RDD[(K, V)](sc, Nil) with Logging
返回的也是RDD的子類MapPartitionsRDD,並且在建立MapPartitionsRDD時將當前RDD的引用this傳入給了建構函式中的prev變數,可以發現在這裡也沒有進行計算邏輯,而是儲存了一個RDD的血緣依賴鏈。def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) } private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev)
的邏輯一致,也是返回MapPartitionsRDD。def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => }
也只是返回RDD的子類MapPartitionsRDD或者ShuffledRDD。另外,也可以注意到ShuffledRDD的前置依賴也傳入的是Nil值。def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) } def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { ... if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } } def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) } class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag]( @transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner) extends RDD[(K, C)](prev.context, Nil)
會呼叫SparkContext.runJob方法,從而觸發實際的任務計算。def saveAsTextFile(path: String): Unit = withScope { ... RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { ... saveAsHadoopDataset(hadoopConf) } def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { ... self.context.runJob(self, writeToFile) writer.commitJob()
- 檢視各Transformation和Action運算元,可以發現所有的Transformation都是返回RDD的子類物件,而只有Action運算元才會觸發runJob方法從而生成實際的任務。所以Spark中的RDD其實是一個RDD的依賴序列,每個RDD都儲存有上游依賴RDD的引用,計算邏輯是以RDD血緣依賴的方式暫存。
- 首先會呼叫DAGScheduler.runJob和submitJob方法,然後交由DAGScheduler.handleJobSubmitted方法進行處理。該方法會先完成Stage的劃分,然後進行最後一個Stage的提交操作。如果有父Stage未提交,submitStage方法以遞迴的方式先提交父Stage,並將當前Stage放入waitingStages佇列。
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } ... submitStage(finalStage) } /** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy( logDebug("missing: " + missing) if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " +, None) }
- 然後呼叫DAGScheduler.submitMissingTasks方法提交stage中的task任務。首先會找到需要計算的分割槽,然後根據stage的型別建立儲存RDD的Broadcast,將該資訊廣播到所有的Executor上。stage分為ShuffleMapStage和ResultStage,一個job中有且僅有一個ResultTask用於資料輸出,所以它只需要序列化RDD分割槽資料資訊和相應的輸出函式。另一方面,我們知道stage是按資料shuffle進行劃分的,所有對於非輸出的stage,實際就讀取上一個stage的shuffle輸入,所以需要序列化RDD分割槽資訊以及各個分割槽的依賴上游RDD的分割槽資訊。接著,依據需要計算的RDD每一個分割槽partitionsToCompute建立ShuffleMapTask或ResultTask任務陣列,並利用TaskSchedulerImpl.submitTasks進行任務提交。
/** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() ... var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } taskBinary = sc.broadcast(taskBinaryBytes) } ... val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } ... if (tasks.size > 0) { taskScheduler.submitTasks(new TaskSet( tasks.toArray,, stage.latestInfo.attemptId, jobId, properties)) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) submitWaitingChildStages(stage) } }
- 最後將封裝好的Task任務序列傳送到相應的Executor去執行。TaskSchedulerImpl.submitTasks首先建立TaskSetManager物件來封裝TaskSet,然後將TaskSetManager物件放入排程池(Pool),將任務傳送給相應的Executor進行排程執行。
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${{}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }
- 首先Executor收到要執行的任務後,呼叫Executor.launchTask將任務放入執行緒池等待執行。
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }
- 接著TaskRunner將任務反序列化後,恢復task相關的引數設定,並呼叫task.run方法執行。task為ShuffleMapTask或ResultTask,當task執行完成後,將序列化結果值value,最後呼叫CoarseGrainedExecutorBackend.statusUpdate將結果告知Driver。
override def run(): Unit = { ... val ser = env.closureSerializer.newInstance() try { // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set( updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) task = ser.deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = task.setTaskMemoryManager(taskMemoryManager) env.mapOutputTracker.updateEpoch(task.epoch) val value = try { val res = taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } ... val resultSer = env.serializer.newInstance() val valueBytes = resultSer.serialize(value) // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { ... serializedDirectResult } execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } finally { runningTasks.remove(taskId) } }
- 然後Task.run會呼叫ShuffleMapTask或ResultTask的runTask方法,從而呼叫該階段末RDD的iterator方法,獲取該RDD某個分割槽內的資料記錄。對於ShuffleMapTask其是將該RDD某個分割槽的資料利用ShuffleWriter輸出,而對於ResultTask則是利用相應的輸出函式進行輸出。其中iterator方法呼叫呼叫RDD類的compute方法獲取相應分割槽的資料是Spark延時計算的關鍵,即如何根據RDD的血緣依賴得到當前分割槽的資料。
final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T = { ... runTask(context) ... } // ShuffleMapTask.scala override def runTask(context: TaskContext): MapStatus = { ... var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } ... } // ResultTask.scala override def runTask(context: TaskContext): U = { ... func(context, rdd.iterator(partition, context)) } final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } } private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } } def compute(split: Partition, context: TaskContext): Iterator[T]
- 最後是RDD分割槽資料的計算獲取。RDD抽象類要求其所有子類都必須實現compute方法,該方法的引數之一是一個Partition物件,目的是計算該分割槽中的資料。換句話說,compute函式負責的是父RDD分割槽資料到子RDD分割槽資料的變換邏輯。可以發現,HadoopRDD中實現的邏輯是讀取檔案記錄,MapPartitionsRDD中是將父RDD.iterator返回的分割槽資料作用於Transformation運算元定義的處理函式f上,ShuffledRDD則是利用shuffleHandle讀取上一stage的shuffle輸出資料。
// HadoopRDD.scala override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new NextIterator[(K, V)] { ... private var reader: RecordReader[K, V] = null reader = try { inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) } ... new InterruptibleIterator[(K, V)](context, iter) } // MapPartitionsRDD.scala override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) // ShuffledRDD.scala override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) .read() .asInstanceOf[Iterator[(K, C)]] }
四 小結