spark原始碼分析, 任務反序列化及執行
阿新 • • 發佈:2020-08-24
1 ==> 接受訊息,org.apache.spark.executor.CoarseGrainedExecutorBackend#receive
case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc) }
2. ==>org.apache.spark.executor.Executor#launchTask
// Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit= { val tr = new TaskRunner(context, taskDescription) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) }
3. ==>org.apache.spark.executor.Executor.TaskRunner#run
override def run(): Unit = { threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean= ManagementFactory.getThreadMXBean val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) //下載依賴 updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
//反序列化得到真正的 task task = ser.deserialize[Task[Any]](taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) val value = Utils.tryWithSafeFinally { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } { val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId) val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() } //處理執行結果 val resultSer = env.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() // TODO: do not serialize value twice val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit() // directSend = sending directly back to the driver val serializedResult: ByteBuffer = { if (maxResultSize > 0 && resultSize > maxResultSize) { logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " + s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) } else if (resultSize > maxDirectResultSize) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, new ChunkedByteBuffer(serializedDirectResult.duplicate()), StorageLevel.MEMORY_AND_DISK_SER) logInfo( s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)") ser.serialize(new IndirectTaskResult[Any](blockId, resultSize)) } else { logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver") serializedDirectResult } } setTaskFinishedAndClearInterruptStatus() execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) }
==>org.apache.spark.executor.Executor#updateDependencies
/** * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars) { val localName = new URI(name).getPath.split("/").last val currentTimeStamp = currentJars.get(name) .orElse(currentJars.get(localName)) .getOrElse(-1L) if (currentTimeStamp < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentJars(name) = timestamp // Add it to our class loader val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL if (!urlClassLoader.getURLs().contains(url)) { logInfo("Adding " + url + " to class loader") urlClassLoader.addURL(url) } } } } }
==>org.apache.spark.scheduler.Task#run
final def run( taskAttemptId: Long, attemptNumber: Int, metricsSystem: MetricsSystem): T = { SparkEnv.get.blockManager.registerTask(taskAttemptId) val taskContext = new TaskContextImpl( stageId, stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, metrics) context = if (isBarrier) { new BarrierTaskContext(taskContext) } else { taskContext } TaskContext.setTaskContext(context) taskThread = Thread.currentThread() if (_reasonIfKilled != null) { kill(interruptThread = false, _reasonIfKilled) } new CallerContext( "TASK", SparkEnv.get.conf.get(APP_CALLER_CONTEXT), appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId), Option(taskAttemptId), Option(attemptNumber)).setCurrentContext() try { //分為ResultTask, ShuffleMapTask 兩種型別 runTask(context) } }