1. 程式人生 > >32 Spark中的Executor工作原理

32 Spark中的Executor工作原理

內容:

1.     Spark Executor 工作原理

2.     ExecutorBackend 註冊

3.     Executor例項化

4.     Executor 具體工作流程

一、Spark Executor工作原理

1.再次討論Executor註冊

       (1).Master發指令給Worker啟動Executor

       (2).Worker接收指令通過ExecutorRunner啟動另一個程序執行Executor

   (3).Executor啟動CoarseGrainedExecutorBackend

ExecutorRunner新啟動的WorkerThread會通過Command和下載Jar找到CoarseGrainedExecutorBackend類,載入該類,執行入口函式main

def main(args: Array[String]) {
	……
  run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}

private def run(
   ……) {
	……
	//建立執行時環境
val env = SparkEnv.createExecutorEnv(
      driverConf, executorId, hostname, port, cores, isLocal = false)
//載入CoarseGrainedExecutorBackend類,並通過Netty的方式將OnStart訊息傳給
//CoarseGrianedBacked
    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
      env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
    workerUrl.foreach { url =>
      env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
    }
  }
}

(4).CoarseGrainedExecutorBackend通過傳送RegisterExecutor向Driver註冊

       注:在CoarseGrainedBackend啟動時,向Driver註冊Executor其實質是註冊ExecutorBackend例項,而與Executor例項之間無直接的關係!

//程式碼來自CoaresGrainedExecutorBackend.scala

override def onStart() {
  logInfo("Connecting to driver: " + driverUrl)
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    driver = Some(ref)
    ref.ask[RegisterExecutorResponse](
	//向Driver註冊Executor
      RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
  }(ThreadUtils.sameThread).onComplete {
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    case Success(msg) => Utils.tryLogNonFatalError {
      Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
    }
    case Failure(e) => {
      logError(s"Cannot register with driver: $driverUrl", e)
      System.exit(1)
    }
  }(ThreadUtils.sameThread)
}

注:

CoarseGrainedExecutorBackend是Executor執行所在程序的名稱,Executor才是真正負責處理Task的物件,Executor內部是通過執行緒池的方式來完成Task的計算的。

‚CoarseGrainedExecutorBackend和Executor是一一對應的。

ƒCoarseGrainedExecutorBackend是一個訊息通訊體,其內部實現了ThreadSafeRpcEndpoint,可以收、發訊息,啟動時胡發訊息給Driver,接收Driver傳送來的訊息,例如啟動Task。

(5).Driver接收並處理訊息

Driver接收註冊訊息,通過ExecutorData封裝並註冊ExecutorBackend的資訊到Driver的記憶體資料結構executorMapData中。

首先,我們需要清楚Driver程序中最重要的兩個Endpoint(後臺訊息通訊體),一個是clientEndpoint,一個是DriverEndpoint。

1)     ClientEndpoint: 主要負責向Master註冊當前程式,它是APPClient的內部成員;

2)     DriverEndpoint:是這個程式執行時的驅動器,它是CoarseGrainedSchedulerBackend的成員,在SparkContext初始化時由CoarseGrainedSchedulerBackend的子類SparkDeploySchedulerBackend建立。

注:driverEndpoint和clientEndpoint的建立過程參見參考文件coarseGrainedExecutorBackend要通訊的物件driverUrl是driverEndpoint而不是ClientEndpoint。

//程式碼來自CoarseGrainedSchedulerBackend -> class DriverEndpoint -> 
//                       						receiveAndReply方法
//DriverEndpoint 接收到RegisterExecutor訊息
  case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
//excutorDataMap是一個儲存ExecutorData的HashMap
// private val executorDataMap = new HashMap[String, ExecutorData] 
//  (CoarseGrainedSchedulerBackend第60行)
//ExecutorData結構見下頁。
if (executorDataMap.contains(executorId)) {
      context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    } else {
      // If the executor's rpc env is not listening for incoming connections, `hostPort`
      // will be null, and the client connection should be used to contact the executor.
      val executorAddress = if (executorRef.address != null) {
          executorRef.address
        } else {
          context.senderAddress
        }
      logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
      addressToExecutorId(executorAddress) = executorId
      totalCoreCount.addAndGet(cores)
      totalRegisteredExecutors.addAndGet(1)
      val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
        cores, cores, logUrls)
      // This must be synchronized because variables mutated
      // in this block are read when requesting executors
      CoarseGrainedSchedulerBackend.this.synchronized { //1
        executorDataMap.put(executorId, data)
        if (numPendingExecutors > 0) {
          numPendingExecutors -= 1
          logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
        }
      }
      // Note: some tests expect the reply to come after we put the executor in the map
      context.reply(RegisteredExecutor(executorAddress.host))  //2
      listenerBus.post(
        SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
      makeOffers()
    }
}

/**
 * Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
 *
 * @param executorEndpoint The RpcEndpointRef representing this executor
 * @param executorAddress The network address of this executor
 * @param executorHost The hostname that this executor is running on
 * @param freeCores  The current number of cores available for work on the executor
 * @param totalCores The total number of cores available to the executor
 */
  private[cluster] class ExecutorData(
     val executorEndpoint: RpcEndpointRef,
     val executorAddress: RpcAddress,
     override val executorHost: String,
     var freeCores: Int,
     override val totalCores: Int,
     override val logUrlMap: Map[String, String]
  ) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

而我們通過原始碼可以看出最終Executor(ExecutorBackend)的資訊是存在了excutorDataMap資料結構中了,而這個資料結構是CoarseGrainedSchedulerBackend的一個成員,所以實質上Executor(ExecutorBackend)的資訊是註冊給了CoarseGrainedSchedulerBackend。

小結:實際在執行時候DriverEndpoint會把資訊寫入CoarseGrainedSchedulerBackend的記憶體資料結構ExecutorMapData,所以說最終Excutor(ExecutorBackend)的資訊是註冊給了CoarseGrainedSchedulerBackend。從而CoarseGrainedSchedulerBackend掌握了為當前程式分配的所有的ExecutorBackend程序的資訊(存在ExecutorMapData中),而在每一個ExecutorBackend的程序例項中會通過Executor物件來負責具體的Task的執行。在執行時使用synchronized關鍵字來保證executorMapData安全的併發寫操作,見1。

       (6). Driver在Executor註冊成功後會返回RegisteredExecutor資訊給CoarseGrainedExecutorBackend

       見上頁程式碼2‚處。

       (7). CoarseGrainedExecutorBackend接收RegisteredExecutor訊息

CoarseGrainedExecutorBackend收到DriverEnpoint傳送過來的RegisteredExecutor訊息後會啟動Executor例項物件,而Executor例項物件是事

實上負責真正Task計算的。其在例項化時會例項化一個執行緒池來準備Task的計算。


override def receive: PartialFunction[Any, Unit] = {
         case RegisteredExecutor(hostname) =>
         logInfo("Successfully registered with driver")
         executor = new Executor(executorId, hostname, env, userClassPath,    isLocal = false)

private[spark] class Executor(
    executorId: String,
    executorHostname: String,
    env: SparkEnv,
    userClassPath: Seq[URL] = Nil,
    isLocal: Boolean = false)
  extends Logging {    //省略部分程式碼
// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")//後臺方式的執行緒池,計算的執行緒池常駐記憶體中
private val executorSource = new ExecutorSource(threadPool, executorId)

/**
 * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID
*is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
  	val threadFactory = namedThreadFactory(prefix)
        Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

/**
 * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID
*is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
  	val threadFactory = namedThreadFactory(prefix)
        Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

/**
 	* Create a thread factory that names threads with a prefix and also sets the threads to     
*daemon.
   */
def namedThreadFactory(prefix: String): ThreadFactory = {
  new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build() //利用java中的執行緒池建立方式建立執行緒池
}

/** Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available, and uses the provided
 * ThreadFactory to create new threads when needed.
 */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {//更具需要建立執行緒,來自Executors.java
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

至此,我們worker端的準備工作都已做好了,等待Driver傳送Task來執行。

2.Executor具體工作機制

       (1)Driver傳送Task的準備

       首先,Driver傳送過來的Task並不是直接傳送給了Executor,注意這裡的Executor並不是個訊息通訊體,是無法接收訊息的。其實,真正接收訊息的是CoarseGrainedExecutorBackend,從原始碼中我們可以很明白的看到CoarseGrainedExecutorBackend是一個訊息通訊體(繼承自RpcEndpoint)。(見3)所以Driver會發送Task給Worker端的CoarseGrainedExecutorBackend。

private[spark] class CoarseGrainedExecutorBackend^(...)//省略了引數  3
  		extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

在CoarseGrainedSchedulerBackend中給ExecutorBackend傳送RegisteredExecutor後,會執行makeOffers方法來分配具體的Task到每個worker中的Executor去執行。

(2)分配Executor,傳送Task

Driver分配Executor,併發送LaunchTask這個包含Task的case class來傳送Task的。

	// Make fake resource offers on all executors
private def makeOffers() {
// Filter out executors under killing
 val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
        launchTasks(scheduler.resourceOffers(workOffers))
}
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { //確定Task與Executor的對應關係
//省略部分程式碼
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}

(3)接收Task

case LaunchTask(data) => //ExecutorBackend端接到訊息
  if (executor == null) {
logError("Received LaunchTask command but executor was null")
//Executor為空,程序退出
System.exit(1)
  } else {
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
      taskDesc.name, taskDesc.serializedTask)
  }

(4) 呼叫Executor執行任務

ExecutorBackend在接收到Driver中傳送過來的訊息後會提供呼叫launchTask來交給Executor去執行。

(5) 執行任務

   首先,會將Task封裝在TaskRunner裡,然後交給執行緒池中執行緒去執行。

def launchTask(
   	 context: ExecutorBackend,
    	taskId: Long,
    	attemptNumber: Int,
   	 taskName: String,
    	serializedTask: ByteBuffer): Unit = {
	//TaskRunner是一個Runnable介面的具體實現,工作時會交給執行緒池中的執行緒去執行,此時
//會呼叫其run方法來執行Task
 val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)
  	runningTasks.put(taskId, tr)
  	threadPool.execute(tr)
}

override def run(): Unit = {//省略部分程式碼
val res = task.run(
         taskAttemptId = taskId,
         attemptNumber = attemptNumber,
         metricsSystem = env.metricsSystem)
}

TaskRunner在呼叫run方法的時候呼叫Task的run方法,而Task的run方法會呼叫runTask,而實際Task有ShuffleMapTask和ResultTask,會有不同的執行邏輯。(具體內容會在後續章節講述)

二、Spark Executor具體工作流程圖

 

---------------------------------------------------------EOF---------------------------------------------------------------------------------------------------------