1. 程式人生 > >spark job提交6

spark job提交6

driver端呼叫launchTasks來向worker節點中的executor傳送啟動任務命令
spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
	#向executor傳送啟動任務命令
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        
    }

./core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
  override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

    case LaunchTask(data) =>
#處理driver端傳送過來的LaunchTask命令。executor為null的話,直接退出
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
#拿到task的描述
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
#啟動任務的執行
        executor.launchTask(this, taskDesc)
      }
}


  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
#建立新的TaskRunner,然後將tr降到執行緒池中執行
    val tr = new TaskRunner(context, taskDescription)
    runningTasks.put(taskDescription.taskId, tr)
    threadPool.execute(tr)
  }


override def run(): Unit = {
      try {

        // Run the actual task and measure its runtime.
        taskStartTime = System.currentTimeMillis()
        taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          threadMXBean.getCurrentThreadCpuTime
        } else 0L
        var threwException = true
        val value = try {
#執行task的run方法
          val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = taskDescription.attemptNumber,
            metricsSystem = env.metricsSystem)
          threwException = false
        } }

spark-master\core\src\main\scala\org\apache\spark\scheduler\Task.scala

  */
  final def run(
      taskAttemptId: Long,
      attemptNumber: Int,
      metricsSystem: MetricsSystem): T = {
    SparkEnv.get.blockManager.registerTask(taskAttemptId)
    // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether
    // the stage is barrier.
   
    TaskContext.setTaskContext(context)
    taskThread = Thread.currentThread()

    if (_reasonIfKilled != null) {
      kill(interruptThread = false, _reasonIfKilled)
    }

    try {
#呼叫runtask方法執行,不同的任務其實現不同
      runTask(context)
    } catch
這裡的task 分為兩類,一類是ResultTask,另一類是shufflemaptask
spark-master\core\src\main\scala\org\apache\spark\scheduler\ResultTask.scala
這裡以ResultTask為例
override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
#反序列化
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L
#執行rdd.iterator完成計算任務
    func(context, rdd.iterator(partition, context))
  }

再來看看shufflemaptask
spark-master\core\src\main\scala\org\apache\spark\scheduler\ShuffleMapTask.scala
 override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val threadMXBean = ManagementFactory.getThreadMXBean
#反序列化rdd
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
   
    var writer: ShuffleWriter[Any, Any] = null
    try {
#根據shuffleManager得到writer,然後將rdd寫入
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      writer.stop(success = true).get
    } catch 
}