Spark1.6-----原始碼解讀之TaskScheduler啟動
阿新 • • 發佈:2018-12-16
必須啟動TaskScheduler才能讓他發揮作用
SparkContext 530行:
_taskScheduler.start()
實際去調TaskSchedulerImpl 143行:
override def start() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") speculationScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() } }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) } }
而有去調本例就以Localbackend為例的123行的start方法:
override def start() { val rpcEnv = SparkEnv.get.rpcEnv //建立LoaclEndpoint val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint) listenerBus.post(SparkListenerExecutorAdded( System.currentTimeMillis, executorEndpoint.localExecutorId, new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) launcherBackend.setAppId(appId) launcherBackend.setState(SparkAppHandle.State.RUNNING) }
LoaclEndPoint在LocalBackend的45行:
/** * Calls to LocalBackend are all serialized through LocalEndpoint. Using an RpcEndpoint makes the * calls on LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend * and the TaskSchedulerImpl. */ private[spark] class LocalEndpoint( override val rpcEnv: RpcEnv, userClassPath: Seq[URL], scheduler: TaskSchedulerImpl, executorBackend: LocalBackend, private val totalCores: Int) extends ThreadSafeRpcEndpoint with Logging {
該類中第58行建立Executor在Driver端:
private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
Executor的主要構建過程如下:
建立Executor執行Task的執行緒池。
建立並註冊ExecutorSource,用於測量系統。
獲取SparkEnv的資訊。
urlClassLoader建立,用於載入任務傳過來的jar包。
註冊並且建立heartbeatReceiverEndPoint,並且獲得引用。
啟動Executor心跳執行緒。用於向Driver傳送心跳。
// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)
if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
env.blockManager.initialize(conf.getAppId)
}
// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
// Create our ClassLoader
// do this after SparkEnv creation so can access the SecurityManager
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
startDriverHeartbeater()
著重看一下心跳執行緒:
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
上述程式碼最主要的實現在在reportHeartBeat函式中。該函式作用有兩個:
更新正在執行的測量資訊。
通知BlockManagerMaster,這個Executor上的BlockManager還活著。
greportHeartBeat具體實現:
/** Reports heartbeat and metrics for active tasks to the driver. */
private def reportHeartBeat(): Unit = {
// list of (task id, metrics) to send back to the driver
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
val curGCTime = computeTotalGcTime()
//該迴圈只是為了獲取Job的執行資訊
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.foreach { metrics =>
metrics.updateShuffleReadMetrics()
metrics.updateInputMetrics()
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
metrics.updateAccumulators()
if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
// the changes of metrics any more, so make a deep copy of it
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
} else {
// It will be copied by serialization
tasksMetrics += ((taskRunner.taskId, metrics))
}
}
}
}
//將資訊封裝起來
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
try {
//傳送給heartbeatReceiverRef
//在TaskSchedulerImpl中有一個executorHeartbeatReceived方法會接受到傳送的訊息
//並將其轉發給DAGScheduler,DAGScheduler再將其轉發給BlockManagerMaster
//告訴BlockManager該Executo上的BlockManager還活著
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
} catch {
case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
}
}