32 Spark中的Executor工作原理
內容:
1. Spark Executor 工作原理
2. ExecutorBackend 註冊
3. Executor例項化
4. Executor 具體工作流程
一、Spark Executor工作原理
1.再次討論Executor註冊
(1).Master發指令給Worker啟動Executor
(2).Worker接收指令通過ExecutorRunner啟動另一個程序執行Executor
(3).Executor啟動CoarseGrainedExecutorBackend
ExecutorRunner新啟動的WorkerThread會通過Command和下載Jar找到CoarseGrainedExecutorBackend類,載入該類,執行入口函式main
def main(args: Array[String]) { …… run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) } private def run( ……) { …… //建立執行時環境 val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) //載入CoarseGrainedExecutorBackend類,並通過Netty的方式將OnStart訊息傳給 //CoarseGrianedBacked env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } } }
(4).CoarseGrainedExecutorBackend通過傳送RegisterExecutor向Driver註冊
注:在CoarseGrainedBackend啟動時,向Driver註冊Executor其實質是註冊ExecutorBackend例項,而與Executor例項之間無直接的關係!
//程式碼來自CoaresGrainedExecutorBackend.scala override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[RegisterExecutorResponse]( //向Driver註冊Executor RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse } case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) System.exit(1) } }(ThreadUtils.sameThread) }
注:
CoarseGrainedExecutorBackend是Executor執行所在程序的名稱,Executor才是真正負責處理Task的物件,Executor內部是通過執行緒池的方式來完成Task的計算的。
‚CoarseGrainedExecutorBackend和Executor是一一對應的。
ƒCoarseGrainedExecutorBackend是一個訊息通訊體,其內部實現了ThreadSafeRpcEndpoint,可以收、發訊息,啟動時胡發訊息給Driver,接收Driver傳送來的訊息,例如啟動Task。
(5).Driver接收並處理訊息
Driver接收註冊訊息,通過ExecutorData封裝並註冊ExecutorBackend的資訊到Driver的記憶體資料結構executorMapData中。
首先,我們需要清楚Driver程序中最重要的兩個Endpoint(後臺訊息通訊體),一個是clientEndpoint,一個是DriverEndpoint。
1) ClientEndpoint: 主要負責向Master註冊當前程式,它是APPClient的內部成員;
2) DriverEndpoint:是這個程式執行時的驅動器,它是CoarseGrainedSchedulerBackend的成員,在SparkContext初始化時由CoarseGrainedSchedulerBackend的子類SparkDeploySchedulerBackend建立。
注:driverEndpoint和clientEndpoint的建立過程參見參考文件coarseGrainedExecutorBackend要通訊的物件driverUrl是driverEndpoint而不是ClientEndpoint。
//程式碼來自CoarseGrainedSchedulerBackend -> class DriverEndpoint ->
// receiveAndReply方法
//DriverEndpoint 接收到RegisterExecutor訊息
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
//excutorDataMap是一個儲存ExecutorData的HashMap
// private val executorDataMap = new HashMap[String, ExecutorData]
// (CoarseGrainedSchedulerBackend第60行)
//ExecutorData結構見下頁。
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized { //1
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(RegisteredExecutor(executorAddress.host)) //2
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
}
/**
* Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
*
* @param executorEndpoint The RpcEndpointRef representing this executor
* @param executorAddress The network address of this executor
* @param executorHost The hostname that this executor is running on
* @param freeCores The current number of cores available for work on the executor
* @param totalCores The total number of cores available to the executor
*/
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
而我們通過原始碼可以看出最終Executor(ExecutorBackend)的資訊是存在了excutorDataMap資料結構中了,而這個資料結構是CoarseGrainedSchedulerBackend的一個成員,所以實質上Executor(ExecutorBackend)的資訊是註冊給了CoarseGrainedSchedulerBackend。
小結:實際在執行時候DriverEndpoint會把資訊寫入CoarseGrainedSchedulerBackend的記憶體資料結構ExecutorMapData,所以說最終Excutor(ExecutorBackend)的資訊是註冊給了CoarseGrainedSchedulerBackend。從而CoarseGrainedSchedulerBackend掌握了為當前程式分配的所有的ExecutorBackend程序的資訊(存在ExecutorMapData中),而在每一個ExecutorBackend的程序例項中會通過Executor物件來負責具體的Task的執行。在執行時使用synchronized關鍵字來保證executorMapData安全的併發寫操作,見1。
(6). Driver在Executor註冊成功後會返回RegisteredExecutor資訊給CoarseGrainedExecutorBackend
見上頁程式碼2‚處。
(7). CoarseGrainedExecutorBackend接收RegisteredExecutor訊息
CoarseGrainedExecutorBackend收到DriverEnpoint傳送過來的RegisteredExecutor訊息後會啟動Executor例項物件,而Executor例項物件是事
實上負責真正Task計算的。其在例項化時會例項化一個執行緒池來準備Task的計算。
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor(hostname) =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging { //省略部分程式碼
// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")//後臺方式的執行緒池,計算的執行緒池常駐記憶體中
private val executorSource = new ExecutorSource(threadPool, executorId)
/**
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID
*is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
/**
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID
*is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
/**
* Create a thread factory that names threads with a prefix and also sets the threads to
*daemon.
*/
def namedThreadFactory(prefix: String): ThreadFactory = {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build() //利用java中的執行緒池建立方式建立執行緒池
}
/** Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {//更具需要建立執行緒,來自Executors.java
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
至此,我們worker端的準備工作都已做好了,等待Driver傳送Task來執行。
2.Executor具體工作機制
(1)Driver傳送Task的準備
首先,Driver傳送過來的Task並不是直接傳送給了Executor,注意這裡的Executor並不是個訊息通訊體,是無法接收訊息的。其實,真正接收訊息的是CoarseGrainedExecutorBackend,從原始碼中我們可以很明白的看到CoarseGrainedExecutorBackend是一個訊息通訊體(繼承自RpcEndpoint)。(見3)所以Driver會發送Task給Worker端的CoarseGrainedExecutorBackend。
private[spark] class CoarseGrainedExecutorBackend^(...)//省略了引數 3
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
在CoarseGrainedSchedulerBackend中給ExecutorBackend傳送RegisteredExecutor後,會執行makeOffers方法來分配具體的Task到每個worker中的Executor去執行。
(2)分配Executor,傳送Task
Driver分配Executor,併發送LaunchTask這個包含Task的case class來傳送Task的。
// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { //確定Task與Executor的對應關係
//省略部分程式碼
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
(3)接收Task
case LaunchTask(data) => //ExecutorBackend端接到訊息
if (executor == null) {
logError("Received LaunchTask command but executor was null")
//Executor為空,程序退出
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
(4) 呼叫Executor執行任務
ExecutorBackend在接收到Driver中傳送過來的訊息後會提供呼叫launchTask來交給Executor去執行。
(5) 執行任務
首先,會將Task封裝在TaskRunner裡,然後交給執行緒池中執行緒去執行。
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
//TaskRunner是一個Runnable介面的具體實現,工作時會交給執行緒池中的執行緒去執行,此時
//會呼叫其run方法來執行Task
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
override def run(): Unit = {//省略部分程式碼
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
}
TaskRunner在呼叫run方法的時候呼叫Task的run方法,而Task的run方法會呼叫runTask,而實際Task有ShuffleMapTask和ResultTask,會有不同的執行邏輯。(具體內容會在後續章節講述)
二、Spark Executor具體工作流程圖
---------------------------------------------------------EOF---------------------------------------------------------------------------------------------------------