1. 程式人生 > >SparkContext 原始碼分析

SparkContext 原始碼分析

SparkContext 原始碼分析

更多資源

Youtub 視訊分享

bilibili 視訊分享

文件說明

Main entry point for Spark functionality.
 A SparkContext represents the connection to a Spark cluster, 
 and can be used to create RDDs, accumulators and broadcast variables on that cluster.
  Only one SparkContext may be active per JVM. You must stop(
) the active SparkContext before creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.

翻譯

).Spark功能主要入口點
).一個SparkContext表示與一個Spark叢集的連線
).在Spark叢集上,能建立RDDs,累加器,廣播變數
).每個JVM僅僅只有一個SparkContext可能是活動的
).在建立一個新的SparkContext之前,你必須停掉活動的SparkContext,這個限制最終可能被 移除,看SPARK-2243 更多詳情

SparkContext原理圖

SparkContext原理圖

xmind檔案下載

https://github.com/opensourceteams/spark-scala-maven/blob/master/md/images/spark/SparkContext.xmind

配置資訊

可配置資訊

  • spark.jars = jar檔案路徑(可迭代的)
  • spark.files = 檔案路徑
  • spark.eventLog.dir=/tmp/spark-events // 事件日誌目錄
  • spark.eventLog.compress=false //事件日誌是否壓縮
  • spark.shuffle.manager=sort //指定shuffler manager
// Let the user specify short names for shuffle managers
   val shortShuffleMgrNames = Map(
     "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
     "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
     "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
   val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
   val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
   val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
  • spark.memory.useLegacyMode=true //指定記憶體管理器
      val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", true)
      val memoryManager: MemoryManager =
        if (useLegacyMemoryManager) {
          new StaticMemoryManager(conf, numUsableCores)
        } else {
          UnifiedMemoryManager(conf, numUsableCores)
        }
    
- spark.ui.showConsoleProgress=true      //展示控制檯的進度資訊
- spark.ui.enabled=true      //是否開啟SparkUI
- spark.executor.memory=      //spark executor 的記憶體
- SPARK_EXECUTOR_MEMORY=  //spark executor 的記憶體
- SPARK_MEM=  //spark executor 的記憶體
 ```scala
 /**
   *查詢順序,找到前面的就不找後面的了(配置檔案中設定值單位為 byte),預設值為1024MB
 spark.executor.memory  >  SPARK_EXECUTOR_MEMORY >  SPARK_MEM
   *
   /
  _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)
  • spark.scheduler.mode=FIFO //TaskSchedulerImpl 排程模式,可選(FIFO,FAIR,NONE)
/**
*  "FAIR" and "FIFO" determines which policy is used
*    to order tasks amongst a Schedulable's sub-queues
*  "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration {

 type SchedulingMode = Value
 val FAIR, FIFO, NONE = Value
}
  • spark.cores.max=2 設定executor佔用cpu核心個數

  • spark.executor.extraJavaOptions= //設定executor啟動執行的java引數

  • spark.executor.extraClassPath= //設定 executor 執行的classpath

  • spark.executor.extraLibraryPath= //設定 executor LibraryPath

  • spark.executor.cores= //executor core 個數分配

  • spark.rpc.lookupTimeout=“120s” //設定 RPCTimeout超時時間

  • spark.network.timeout=“120s” //設定 RPCTimeout超時時間

 /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
 private[spark] def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
   RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
 }

Spark系統設定配置資訊

  • spark.driver.host = Utils.localHostName()
  • spark.driver.port = 0
  • spark.executor.id = driver

主要內容

建立作業進度監聽器

 _jobProgressListener = new JobProgressListener(_conf)
 listenerBus.addListener(jobProgressListener)

建立SparkEnv

_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
  • 建立DriverEnv
 SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  • 指定預設的 spark.rpc = org.apache.spark.rpc.netty.NettyRpcEnvFactory
  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
    val rpcEnvNames = Map(
      "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
      "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc", "netty")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  }
  • 建立NettyRpcEnv並啟動,此時啟動 ‘sparkDriver’
def create(config: RpcEnvConfig): RpcEnv = {
   val sparkConf = config.conf
   // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
   // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
   val javaSerializerInstance =
     new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
   val nettyEnv =
     new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
   if (!config.clientMode) {
     val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
       nettyEnv.startServer(actualPort)
       (nettyEnv, nettyEnv.address.port)
     }
     try {
       Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
     } catch {
       case NonFatal(e) =>
         nettyEnv.shutdown()
         throw e
     }
   }
   nettyEnv
 }
  • 建立ActorSystem並啟動,此時啟動 ‘sparkDriverActorSystem’
 /**
  * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
  * ActorSystem itself and its port (which is hard to get from Akka).
  *
  * Note: the `name` parameter is important, as even if a client sends a message to right
  * host + port, if the system name is incorrect, Akka will drop the message.
  *
  * If indestructible is set to true, the Actor System will continue running in the event
  * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
  */
 def createActorSystem(
     name: String,
     host: String,
     port: Int,
     conf: SparkConf,
     securityManager: SecurityManager): (ActorSystem, Int) = {
   val startService: Int => (ActorSystem, Int) = { actualPort =>
     doCreateActorSystem(name, host, actualPort, conf, securityManager)
   }
   Utils.startServiceOnPort(port, startService, conf, name)
 }
  • 指定spark序列化器: org.apache.spark.serializer.JavaSerializer

val serializer = instantiateClassFromConfSerializer
logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClassFromConf[Serializer](
  "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

- 例項化 MapOutputTrackerMaster 
    ```scala
    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    val mapOutputTracker = if (isDriver) {
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
      new MapOutputTrackerWorker(conf)
    }
  • 註冊 MapOutputTracker 到 NettyRpcEndpointRef(通訊用 和map輸出資訊的追蹤)
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
   val addr = RpcEndpointAddress(nettyEnv.address, name)
   val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
   synchronized {
     if (stopped) {
       throw new IllegalStateException("RpcEnv has been stopped")
     }
     if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
       throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
     }
     val data = endpoints.get(name)
     endpointRefs.put(data.endpoint, data.ref)
     receivers.offer(data)  // for the OnStart message
   }
   endpointRef
 }
  • 例項化ShuflleManager
// Let the user specify short names for shuffle managers
   val shortShuffleMgrNames = Map(
     "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
     "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
     "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
   val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
   val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
   val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
  • 例項化記憶體管理器

val useLegacyMemoryManager = conf.getBoolean(“spark.memory.useLegacyMode”, true)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}

- 註冊 BlockManagerMaster  到 NettyRpcEndpointRef(通訊用)
  ```scala
    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)
  • 快取管理器 例項化BlockManager
   // NB: blockManager is not valid until initialize() is called later.
   val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
     serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
     blockTransferService, securityManager, numUsableCores)

   val cacheManager = new CacheManager(blockManager)
  • 建立測量系統
val metricsSystem = if (isDriver) {
     // Don't start metrics system right now for Driver.
     // We need to wait for the task scheduler to give us an app ID.
     // Then we can start the metrics system.
     MetricsSystem.createMetricsSystem("driver", conf, securityManager)
   } else {
     // We need to set the executor ID before the MetricsSystem is created because sources and
     // sinks specified in the metrics configuration file will want to incorporate this executor's
     // ID into the metrics they report.
     conf.set("spark.executor.id", executorId)
     val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
     ms.start()
     ms
   }
  • 建立臨時目錄,如果是分散式模式,這是一個executor的當前工作目錄
 // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
   // this is a temporary directory; in distributed mode, this is the executor's current working
   // directory.
   val sparkFilesDir: String = if (isDriver) {
     Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
   } else {
     "."
   }
  • 註冊 OutputCommitCoordinator 到 NettyRpcEndpointRef(通訊用)
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
     new OutputCommitCoordinator(conf, isDriver)
   }
   val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
     new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
   outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
  • new SparkEnv 並返回
 val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      actorSystem,
      serializer,
      closureSerializer,
      cacheManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockTransferService,
      blockManager,
      securityManager,
      sparkFilesDir,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

    // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
    // called, and we only need to do it for driver. Because driver may run as a service, and if we
    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
    if (isDriver) {
      envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
    }

    envInstance

建立SparkUI

   _ui =
     if (conf.getBoolean("spark.ui.enabled", true)) {
       Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
         _env.securityManager, appName, startTime = startTime))
     } else {
       // For tests, do not enable the UI
       None
     }
   // Bind the UI before starting the task scheduler to communicate
   // the bound port to the cluster manager properly
   _ui.foreach(_.bind())

註冊心跳接收器

   // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
   // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
     HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

建立和啟動排程器(TaskScheduler,DAGScheduler)

   // Create and start the scheduler
   val (sched, ts) = SparkContext.createTaskScheduler(this, master)
   _schedulerBackend = sched
   _taskScheduler = ts
   _dagScheduler = new DAGScheduler(this)
   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
   // constructor
   _taskScheduler.start()
  • org.apache.spark.scheduler.TaskSchedulerImpl 文件說明
/**
).SchedulerBackend 對多種型別的叢集排程任務
).LocalBackend 設定  isLocal為true, 也能排程本地任務
).SchedulerBackend.處理常用邏輯,決定跨作業的排程順序,喚醒和啟動推測的任務
).客戶端應該先呼叫  initialize() 和  start(),然後通過 runTasks方法提交任務集

 * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
 * It can also work with a local setup by using a LocalBackend and setting isLocal to true.
 * It handles common logic, like determining a scheduling order across jobs, waking up to launch
 * speculative tasks, etc.
 *
 * Clients should first call initialize() and start(), then submit task sets through the
 * runTasks method.
 *
 * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
 * threads, so it needs locks in public API methods to maintain its state. In addition, some
 * SchedulerBackends synchronize on themselves when they want to send events here, and then
 * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
 * we are holding a lock on ourselves.
 */
  • Standalone模式建立TaskSchedulerImpl並初使化中指定 backend為SparkDeploySchedulerBackend
     case SPARK_REGEX(sparkUrl) =>
       val scheduler = new TaskSchedulerImpl(sc)
       val masterUrls = sparkUrl.split(",").map("spark://" + _)
       val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
       scheduler.initialize(backend)
       (backend, scheduler)
 def initialize(backend: SchedulerBackend) {
   this.backend = backend
   // temporarily set rootPool name to empty
   rootPool = new Pool("", schedulingMode, 0, 0)
   schedulableBuilder = {
     schedulingMode match {
       case SchedulingMode.FIFO =>
         new FIFOSchedulableBuilder(rootPool)
       case SchedulingMode.FAIR =>
         new FairSchedulableBuilder(rootPool, conf)
     }
   }
   schedulableBuilder.buildPools()
 }

任務排程器啟動

  • 任務排程器啟動_taskScheduler.start()
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
   // constructor
   _taskScheduler.start()
  • 呼叫SparkDeploySchedulerBackend start方法
  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
  • 再呼叫CoarseGrainedSchedulerBackend 的start方法 registerRpcEndpoint
    註冊(通訊用) [CoarseGrainedScheduler]
  • 例項化ApplicationDescription 包含 command (org.apache.spark.executor.CoarseGrainedExecutorBackend)
  • 啟動 AppClient registerRpcEndpoint 註冊(通訊用)[AppClient]
override def start() {
    super.start()
    launcherBackend.connect()

    // The endpoint for executors to talk to us
    val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
      RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }
  • CoarseGrainedSchedulerBackend 文件
/**
). 一個後端排程器 等待粗粒度 executors 通過 Akka連線他
).在Spark作業期間,這個後端排程接管著每個executor 比起交出executors給別人排程
).無論何時完成任務,都要求排程器啟動一個新的executor給每個每個新的任務
).executors 可以以多種方式啟動,例如 粗粒度 Mesos模式 Mesos任務
).或Spark standalone 部署模式的 standalone 處理

* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
* This backend holds onto each executor for the duration of the Spark job rather than relinquishing
* executors whenever a task is done and asking the scheduler to launch a new executor for
* each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
* coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
* (spark.deploy.*).
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
 extends ExecutorAllocationClient with SchedulerBackend with Logging
{
  • 呼叫ClientEndpoint 的 onStart方法 異步向所有master註冊,向master傳送訊息: RegisterApplication
override def onStart(): Unit = {
     try {
       registerWithMaster(1)
     } catch {
       case e: Exception =>
         logWarning("Failed to connect to master", e)
         markDisconnected()
         stop()
     }
   }
 /**
    * Register with all masters asynchronously. It will call `registerWithMaster` every
    * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
    * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
    *
    * nthRetry means this is the nth attempt to register with master.
    */
   private def registerWithMaster(nthRetry: Int) {
     registerMasterFutures.set(tryRegisterAllMasters())
     registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
       override def run(): Unit = {
         if (registered.get) {
           registerMasterFutures.get.foreach(_.cancel(true))
           registerMasterThreadPool.shutdownNow()
         } else if (nthRetry >= REGISTRATION_RETRIES) {
           markDead("All masters are unresponsive! Giving up.")
         } else {
           registerMasterFutures.get.foreach(_.cancel(true))
           registerWithMaster(nthRetry + 1)
         }
       }
     }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
   }
   /**
    *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
    */
   private def tryRegisterAllMasters(): Array[JFuture[_]] = {
     for (masterAddress <- masterRpcAddresses) yield {
       registerMasterThreadPool.submit(new Runnable {
         override def run(): Unit = try {
           if (registered.get) {
             return
           }
           logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
           val masterRef =
             rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
           masterRef.send(RegisterApplication(appDescription, self))
         } catch {
           case ie: InterruptedException => // Cancelled
           case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
         }
       })
     }
   }

org.apache.spark.rpc.netty.Dispatcher

類文件說明

/**
一個訊息分配器,負責將RPC訊息路由到適當的端點
* A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
*/
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {

註冊RPC端點 (關鍵通訊及OnStart方法的呼叫)

/**
 * new EndpointData(name, endpoint, endpointRef) 的時候會進行 
 *  val inbox = new Inbox(ref, endpoint)的操作
 * Inbox 例項化時會進行如下操作,當相於首先增加OnStart訊息
 // OnStart should be the first message to process
  inbox.synchronized {
    messages.add(OnStart)
  }
 */
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      val data = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

Inbox

A inbox that stores messages for an [[RpcEndpoint]] and posts messages to it thread-safely.

/**
* Inbox 例項化時會進行如下操作,當相於首先增加OnStart訊息給當前的物件
(也就是每個RpcEndpoint 子類進行註冊時,首先增加OnStart訊息)
* OnStart訊息在 process方法中會進行 endpoint實現類的onStart() 方法回撥
*/
 // OnStart should be the first message to process
 inbox.synchronized {
   messages.add(OnStart)
 }

  /**
   * Process stored messages.
   */
  def process(dispatcher: Dispatcher): Unit = {
	  ......
case OnStart =>
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }

入口程式碼塊 400行

try {
    _conf = config.clone()
    _conf.validateSettings()

    if (!_conf.contains("spark.master")) {
      throw new SparkException("A master URL must be set in your configuration")
    }
    if (!_conf.contains("spark.app.name")) {
      throw new SparkException("An application name must be set in your configuration")
    }

    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
    // yarn-standalone is deprecated, but still supported
    if ((master == "yarn-cluster" || master == "yarn-standalone") &&
        !_conf.contains("spark.yarn.app.id")) {
      throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
    }

    if (_conf.getBoolean("spark.logConf", false)) {
      logInfo("Spark configuration:\n" + _conf.toDebugString)
    }

    // Set Spark driver host and port system properties
    _conf.setIfMissing("spark.driver.host", Utils.localHostName())
    _conf.setIfMissing("spark.driver.port", "0")

    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

    _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
      .toSeq.flatten

    _eventLogDir =
      if (isEventLogEnabled) {
        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
          .stripSuffix("/")
        Some(Utils.resolveURI(unresolvedDir))
      } else {
        None
      }

    _eventLogCodec = {
      val compress = _conf.getBoolean("spark.eventLog.compress", false)
      if (compress && isEventLogEnabled) {
        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
      } else {
        None
      }
    }

    _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)

    if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

    // "_jobProgressListener" should be set up before creating SparkEnv because when creating
    // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
    _jobProgressListener = new JobProgressListener(_conf)
    listenerBus.addListener(jobProgressListener)

    // Create the Spark execution environment (cache, map output tracker, etc)
    _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)

    // If running the REPL, register the repl's output dir with the file server.
    _conf.getOption("spark.repl.class.outputDir").foreach { path =>
      val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
      _conf.set("spark.repl.class.uri", replUri)
    }

    _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)

    _statusTracker = new SparkStatusTracker(this)

    _progressBar =
      if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
        Some(new ConsoleProgressBar(this))
      } else {
        None
      }

    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
          _env.securityManager, appName, startTime = startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    _ui.foreach(_.bind())

    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

    // Add each JAR given through the constructor
    if (jars != null) {
      jars.foreach(addJar)
    }

    if (files != null) {
      files.foreach(addFile)
    }

    _executorMemory = _conf.getOption("spark.executor.memory")
      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
      .orElse(Option(System.getenv("SPARK_MEM"))
      .map(warnSparkMem))
      .map(Utils.memoryStringToMb)
      .getOrElse(1024)

    // Convert java options to env vars as a work around
    // since we can't set env vars directly in sbt.
    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
      value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
      executorEnvs(envKey) = value
    }
    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
      executorEnvs("SPARK_PREPEND_CLASSES") = v
    }
    // The Mesos scheduler backend relies on this environment variable to set executor memory.
    // TODO: Set this only in the Mesos scheduler.
    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
    executorEnvs ++= _conf.getExecutorEnv
    executorEnvs("SPARK_USER") = sparkUser

    // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
    // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()
    _conf.set("spark.app.id", _applicationId)
    _ui.foreach(_.setAppId(_applicationId))
    _env.blockManager.initialize(_applicationId)

    // The metrics system for Driver need to be set spark.app.id to app ID.
    // So it should start after we get app ID from the task scheduler and set spark.app.id.
    metricsSystem.start()
    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

    _eventLogger =
      if (isEventLogEnabled) {
        val logger =
          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
            _conf, _hadoopConfiguration)
        logger.start()
        listenerBus.addListener(logger)
        Some(logger)
      } else {
        None
      }

    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
    val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
    if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
      logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
    }

    _executorAllocationManager =
      if (dynamicAllocationEnabled) {
        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
      } else {
        None
      }
    _executorAllocationManager.foreach(_.start())

    _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

    setupAndStartListenerBus()
    postEnvironmentUpdate()
    postApplicationStart()

    // Post init
    _taskScheduler.postStartHook()
    _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
    _executorAllocationManager.foreach { e =>
      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
    }

    // Make sure the context is stopped if the user forgets about it. This avoids leaving
    // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
    // is killed, though.
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      stop()
    }
  } catch {
    case NonFatal(e) =>
      logError("Error initializing SparkContext.", e)
      try {
        stop()
      } catch {
        case NonFatal(inner) =>
          logError("Error stopping SparkContext after init error.", inner)
      } finally {
        throw e
      }
  }