Spark:Executor原理剖析與原始碼分析
阿新 • • 發佈:2019-01-11
Executor原理示意圖
Executor程序的啟動
worker中為application啟動的executor,實際上是啟動的這個CoarseGrainedExecutorBackend程序.
原始碼分析:
第一步:CoarseGrainedExecutorBackend原始碼
原始碼地址:org.apache.spark.executor.CoarseGrainedExecutorBackend.scala
/** * work中為application啟動的executor,實際上是啟動了CoarseGrainedExecutorBackend程序 */ private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, hostname: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need // to be changed so that we don't share the serializer instance across threads private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() /** * 初始化方法 * 相當於是向driver 傳送RegisterExecutor */ override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" //獲取driver的actor driver = Some(ref) //向driver傳送RegisterExecutor資訊 ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) } def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)) } override def receive: PartialFunction[Any, Unit] = { /** * 當driver註冊好executor之後 ,返回RegisteredExecutor訊息 * 此時CoarseGrainedExecutorBackend會建立Executor執行控制代碼,大部分的功能都是通過Executor實現的 */ case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } ...... }
啟動task
原始碼分析
第一步:使用executor控制代碼的launchTask()方法,啟動task
// 啟動task case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { //反序列化 val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) //使用executor控制代碼的launchTask()方法,啟動task executor.launchTask(this, taskDesc) }
第二部:launchTask()方法
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { // 對於每一個task都需要建立一個taskRunner 【執行緒】 // TaskRunner實際上是繼承Java的Runnable介面 val tr = new TaskRunner(context, taskDescription) // 將TaskRunner放入記憶體快取中,runningTasks維護執行任務列表。 runningTasks.put(taskDescription.taskId, tr) //將task封裝在一個執行緒中(TaskRunner),將執行緒丟入執行緒池中,然後執行 // 執行緒池是實現排隊機制的,如果執行緒池內的執行緒暫時沒有空閒,放入的執行緒就會排隊 threadPool.execute(tr) }