1. 程式人生 > >Spark 核心篇-SparkEnv

Spark 核心篇-SparkEnv

本章內容:

1、功能概述

SparkEnv是Spark的執行環境物件,其中包括與眾多Executor執行相關的物件。Spark 對任務的計算都依託於 Executor 的能力,所有的 Executor 都有自己的 Spark 的執行環境 SparkEnv。有了 SparkEnv,就可以將資料儲存在儲存體系中;就能利用計算引擎對計算任務進行處理,就可以在節點間進行通訊等。在local模式下Driver會建立Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend程序中也會建立Executor,所以SparkEnv存在於Driver或者CoarseGrainedExecutorBackend程序中。

建立SparkEnv主要使用SparkEnv的createDriverEnv方法,有四個引數:conf、isLocal、listenerBus 以及在本地模式下driver執行executor需要的numberCores。

/**
 * :: DeveloperApi ::
 * Holds all the runtime environment objects for a running Spark instance (either master or worker),
 * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
 * Spark code finds the SparkEnv through a global variable, so all the threads can access the same
 * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
 *
 * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
 *       in a future release.
 */
@DeveloperApi
class SparkEnv (
    val executorId: String,
    private[spark] val rpcEnv: RpcEnv,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val serializerManager: SerializerManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleManager: ShuffleManager,
    val broadcastManager: BroadcastManager,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val metricsSystem: MetricsSystem,
    val memoryManager: MemoryManager,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf) extends Logging

圖1 在 Driver 上建立 SparkEnv 

 

圖2 在 Executor 上建立 SparkEnv

2、相關元件

名稱 說明
SecurityManager 主要對賬戶、許可權及身份認證進行設定與管理。
RpcEnv 各個元件之間通訊的執行環境。
SerializerManager Spark 中很多物件在通用網路傳輸或者寫入儲存體系時,都需要序列化。
BroadcastManager 用於將配置資訊和序列化後的RDD、Job以及ShuffleDependency等資訊在本地儲存。
MapOutputTracker 用於跟蹤Map階段任務的輸出狀態,此狀態便於Reduce階段任務獲取地址及中間結果。
ShuffleManager 負責管理本地及遠端的Block資料的shuffle操作。
MemoryManager 一個抽象的記憶體管理器,用於執行記憶體如何在執行和儲存之間共享。
NettyBlockTransferService 使用Netty提供的非同步事件驅動的網路應用框架,提供Web服務及客戶端,獲取遠端節點上Block的集合。 
BlockManagerMaster 負責對BlockManager的管理和協調。
BlockManager 負責對Block的管理,管理整個Spark執行時的資料讀寫的,當然也包含資料儲存本身,在這個基礎之上進行讀寫操作。
MetricsSystem 一般是為了衡量系統的各種指標的度量系統。
OutputCommitCoordinator 確定任務是否可以把輸出提到到HFDS的管理者,使用先提交者勝的策略。

3、程式碼分析

程式碼 說明
// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

建立 Spark 執行時環境(包括:cache、map output tracker 等)

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
 isLocal: Boolean,
 listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

這個函式允許 SparkEnv 建立的元件在測試單元中被模仿。

類名:SparkContext

函式:createSparkEnv

引數:

  • conf:Spark 配置
  • isLocal:是否是本地模式
  • listenerBus:Spark 事件監聽器

用意:直接呼叫 SparkEnv.createDriverEnv()函式

/**
* Create a SparkEnv for the driver.
*/
private[spark] def createDriverEnv(
conf: SparkConf,
 isLocal: Boolean,
 listenerBus: LiveListenerBus,
 numCores: Int,
 mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
 s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
None
}
create(
conf,
 SparkContext.DRIVER_IDENTIFIER,
 bindAddress,
 advertiseAddress,
 Option(port),
 isLocal,
 numCores,
 ioEncryptionKey,
 listenerBus = listenerBus,
 mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}

 為 Driver 建立一個 SparkEnv 物件

類名:SparkEnv

函式:createDriverEnv

引數:

  • conf:Spark 配置
  • isLocal:是否是本地模式
  • listenerBus:Spark 事件監聽器
  • numCores:Core 個數
  • mockOutputCommitCoordinator:Spark 輸出提交控制器

用意:做了 HOST和 PORT 判斷,然後呼叫 create()函式

/**
* Helper method to create a SparkEnv for a driver or an executor.
*/
private def create(
conf: SparkConf,
 executorId: String,
 bindAddress: String,
 advertiseAddress: String,
 port: Option[Int],
 isLocal: Boolean,
 numUsableCores: Int,
 ioEncryptionKey: Option[Array[Byte]],
 listenerBus: LiveListenerBus = null,
 mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

 // Listener bus is only used on the driver
 if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
}

val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}

ioEncryptionKey.foreach { _ =>
if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
}

val systemName = if (isDriver) driverSystemName else executorSystemName
 val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
 securityManager, numUsableCores, !isDriver)

// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
 if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
}

// Create an instance of the class with the given name, possibly initializing it with our conf
 def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
 try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
}

// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
 def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

val closureSerializer = new JavaSerializer(conf)

def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}

// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
 mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
 new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

// Let the user specify short names for shuffle managers
 val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
 "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}

val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}

val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
 blockManagerPort, numUsableCores)

val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
 new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
 conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
 val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
 serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
 blockTransferService, securityManager, numUsableCores)

val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
 MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
 conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}

val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
 new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

val envInstance = new SparkEnv(
executorId,
 rpcEnv,
 serializer,
 closureSerializer,
 serializerManager,
 mapOutputTracker,
 shuffleManager,
 broadcastManager,
 blockManager,
 securityManager,
 metricsSystem,
 memoryManager,
 outputCommitCoordinator,
 conf)

// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
 if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
}

envInstance
}

為 Driver 和 Executor 建立一個 SparkEnv 的 Helper 方法

類名:SparkEnv

函式:create

用意:

  1. 建立安全管理器 SecurityManager
  2. 建立 RPC 通訊層 RpcEnv
  3. 建立序列化管理器 SerializerManager
  4. 建立廣播管理器 BroadcastManager
  5. 建立 Map 任務輸出跟蹤器 MapOutputTracker
  6. 建立 ShuffleManager
  7. 建立記憶體管理器 MemoryManager
  8. 建立塊傳輸服務 NettyBlockTransferService
  9. 建立 BlockManagerMaster
  10. 建立塊管理器 BlockManager
  11. 建立測量系統 MetricsSystem
  12. 建立 OutputCommitCoordinator
  13. 建立 SparkEnv

3.1 建立安全管理器 SecurityManager

SecurityManager主要對帳號、許可權以及身份認證進行設定和管理。如果 Spark 的部署模式為 YARN,則需要生成 secret key (金鑰)並存儲 Hadoop UGI。而在其他模式下,則需要設定環境變數 _SPARK_AUTH_SECRET(優先順序更高)或者 spark.authenticate.secret 屬性指定 secret key (金鑰)。最後SecurityManager 中設定了預設的口令認證例項 Authenticator,此例項採用匿名內部類實現,用於每次使用 HTTP client 從 HTTP 伺服器獲取使用者的使用者和密碼。這是由於 Spark 的節點間通訊往往需要動態協商使用者名稱、密碼,這種方式靈活地支援了這種需求。

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
  securityManager.initializeAuth()
}
 
// 變數處理
// 第一步:new SecurityManager()
// 包名:org.apache.spark
// 類名:SecurityManager
// 使用 HTTP 連結設定口令認證
// Set our own authenticator to properly negotiate(協商/達成) user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
if (authOn) {
  Authenticator.setDefault(
    // 建立口令認證例項,複寫PasswordAuthentication方法,獲得使用者名稱和密碼
    new Authenticator() {
      override def getPasswordAuthentication(): PasswordAuthentication = {
        var passAuth: PasswordAuthentication = null
        val userInfo = getRequestingURL().getUserInfo()
        if (userInfo != null) {
          val  parts = userInfo.split(":", 2)
          passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
        }
        return passAuth
      }
    }
  )
}
 
// 第二步:initializeAuth()
// 包名:org.apache.spark
// 類名:SecurityManager
/**
 * Initialize the authentication secret.
 *
 * If authentication is disabled, do nothing.
 *
 * In YARN mode, generate a new secret and store it in the current user's credentials.
 *
 * In other modes, assert that the auth secret is set in the configuration.
 */
def initializeAuth(): Unit = {
  if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
    return
  }
 
  if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
    require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
      s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
    return
  }
 
  val rnd = new SecureRandom()
  val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
  val secretBytes = new Array[Byte](length)
  rnd.nextBytes(secretBytes)
 
  val creds = new Credentials()
  val secretStr = HashCodes.fromBytes(secretBytes).toString()
  creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8))
  UserGroupInformation.getCurrentUser().addCredentials(creds)
}

3.2 建立 RPC 通訊層 RpcEnv

Spark1.6推出的RpcEnv、RpcEndPoint、RpcEndpointRef為核心的新型架構下的RPC通訊方式,在底層封裝了Akka和Netty,為未來擴充更多的通訊系統提供了可能。RpcEnv是RPC的環境,所有的RpcEndpoint都需要註冊到RpcEnv例項物件中,管理著這些註冊的RpcEndpoint的生命週期: 

  • 根據name或者uri註冊RpcEndpoint; 
  • 管理各種訊息的處理; 
  • 停止RpcEndpoint 

Spark RPC中最為重要的三個抽象(“三劍客”)為:RpcEnv、RpcEndpoint、RpcEndpointRef,這樣做的好處有:

  • 對上層的API來說,遮蔽了底層的具體實現,使用方便
  • 可以通過不同的實現來完成指定的功能,方便擴充套件
  • 促進了底層實現層的良性競爭,Spark 1.6.3中預設使用了Netty作為底層的實現,但Akka的依賴依然存在;而Spark 2.1.0中的底層實現只有Netty,這樣使用者可以方便的使用不同版本的Akka或者將來某種更好的底層實現
建立RpcEnv的程式碼:
// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"
 
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
 
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
  securityManager, numUsableCores, !isDriver)
 
// 變數處理
// 第一步
// 包名:org.apache.spark.rpc
// 類名:RpcEnv
def create(
    name: String,
    bindAddress: String,
    advertiseAddress: String,
    port: Int,
    conf: SparkConf,
    securityManager: SecurityManager,
    numUsableCores: Int,
    clientMode: Boolean): RpcEnv = {
  val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
    numUsableCores, clientMode)
  new NettyRpcEnvFactory().create(config)
}
 
// 第二步
// 包名:org.apache.spark.rpc.netty
// 類名:NettyRpcEnv
def create(config: RpcEnvConfig): RpcEnv = {
  val sparkConf = config.conf
  // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
  // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
  val javaSerializerInstance =
    new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
  val nettyEnv =
    new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
      config.securityManager, config.numUsableCores)
  if (!config.clientMode) {
    val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
      nettyEnv.startServer(config.bindAddress, actualPort)
      (nettyEnv, nettyEnv.address.port)
    }
    try {
      Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
    } catch {
      case NonFatal(e) =>
        nettyEnv.shutdown()
        throw e
    }
  }
  nettyEnv
}

3.3 建立序列化管理器 SerializerManager

Spark 中很多物件在通用網路傳輸或者寫入儲存體系時,都需要序列化。SparkEnv 中有兩個序列化元件,分別是SerializerManager和ClosureSerializer。

建立 SparkEnv 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
  instantiateClass[T](conf.get(propertyName, defaultClassName))
}
 
val serializer = instantiateClassFromConf[Serializer](
  "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
 
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
 
val closureSerializer = new JavaSerializer(conf)

可以看到這裡建立的serializer預設為org.apache.spark.serializer.JavaSerializer,使用者可以通過spark.serializer屬性配置其他的序列化實現,如org.apache.spark.serializer.KryoSerializer。而 closureSerializer 的實際型別固定為org.apache.spark.serializer.JavaSerializer,使用者不能夠自己指定。JavaSerializer採用 Java 語言自帶的序列化 API 實現。

3.4 建立廣播管理器 BroadcastManager

BroadcastManager用於將配置資訊和序列化後的RDD、Job以及ShuffleDependency等資訊在本地儲存。如果為了容災,也會複製到其他節點上。

圖3 向 Executor 廣播一個變數

建立 BroadcastManager 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
 
// 變數處理
// 包名:org.apache.spark.broadcast
// 類名:BroadcastManager
initialize()
 
// Called by SparkContext or Executor before using Broadcast
private def initialize() {
  synchronized {
    if (!initialized) {
      broadcastFactory = new TorrentBroadcastFactory
      broadcastFactory.initialize(isDriver, conf, securityManager)
      initialized = true
    }
  }
}

BroadcastManager 在其初始化的過程中就會呼叫自身的 initialize 方法,當 initialize 執行完畢,BroadcastManager 就會正式生效。

3.5 建立 Map 任務輸出跟蹤器 MapOutputTracker

MapOutputTracker 用於跟蹤Map階段任務的輸出狀態,此狀態便於Reduce階段任務獲取地址及中間結果。每個Map任務或者Reduce任務都會有其唯一的標識,分別為mapId 和 reduceId。每個Reduce任務的輸入可能是多個Map任務的輸出,Reduce會到各個Map任務的所在節點上拉取Block,這一過程叫做Shuffle。每個Shuffle過程都有唯一的表示shuffleId。 

MapOutputTracker 有兩個子類:MapOutputTrackerMaster(for driver) 和 MapOutputTrackerWorker(for executors);因為它們使用了不同的HashMap來儲存元資料。

建立 MapOutputTracker 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val mapOutputTracker = if (isDriver) {
  new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
  new MapOutputTrackerWorker(conf)
}
 
// 變數處理
// 第一步
// 包名:org.apache.spark
// 類名:SparkEnv
// MapOutputTracker.ENDPOINT_NAME 變數宣告為 val ENDPOINT_NAME = "MapOutputTracker"
// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
  new MapOutputTrackerMasterEndpoint(
    rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
 
// 第二步
// 包名:org.apache.spark
// 類名:SparkEnv
def registerOrLookupEndpoint(
    name: String, endpointCreator: => RpcEndpoint):
  RpcEndpointRef = {
  if (isDriver) {
    logInfo("Registering " + name)
    rpcEnv.setupEndpoint(name, endpointCreator)
  } else {
    RpcUtils.makeDriverRef(name, conf, rpcEnv)
  }
}
 
// 第三步
// 包名:org.apache.spark.rpc.netty
// 類名:NettyRpcEnv
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  dispatcher.registerRpcEndpoint(name, endpoint)
}
 
// 包名:org.apache.spark.util
// 類名:RpcUtils
/**
 * Retrieve a `RpcEndpointRef` which is located in the driver via its name.
 */
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
  val driverHost: String = conf.get("spark.driver.host", "localhost")
  val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  Utils.checkHost(driverHost)
  rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

在 MapOutputTracker 初始化中,可以看到針對當前例項是 Driver 還是 Executor,建立其方式有所不同。

  • 如果當前應用程式是 Driver,則建立 MapOutputTrackerMaster,然後建立 MapOutputTrackerMasterEndpoint,並且註冊到 Dispatcher 中,註冊名為 MapOutputTracker;
  • 如果當前應用程式是 Executor,則建立 MapOutputTrackerWorker,並從遠端 Driver 例項的 NettyRpcEnv 的 Dispatcher 中查詢 MapOutputTrackerMasterEndpoint 的引用。

無論是 Driver 還是 Executor,最後都由 MapOutputTracker 的屬性 trackerEndpoint 持有 MapOutputTrackerEndpoint 的引用。

3.6 建立 ShuffleManager

ShuffleManager負責管理本地及遠端的Block資料的shuffle操作。ShuffleManager根據預設的 spark.shuffle.manager 屬性,通過反射方式生成的SortShuffleManager的例項。預設使用的是sort模式的SortShuffleManager,Spark 2.x.x 版本提供 sort 和 tungsten-sort 兩種 ShuffleManager 的實現。無論是 sort 還是 tungsten-sort,我們看到實現類都是 SortShuffleManager。

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
  "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
  "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
  shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
 
 
// 變數處理
// 第一步
// 包名:org.apache.spark
// 類名:SparkEnv
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
  val cls = Utils.classForName(className)
  // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
  // SparkConf, then one taking no arguments
  try {
    cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
      .newInstance(conf, new java.lang.Boolean(isDriver))
      .asInstanceOf[T]
  } catch {
    case _: NoSuchMethodException =>
      try {
        cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
      } catch {
        case _: NoSuchMethodException =>
          cls.getConstructor().newInstance().asInstanceOf[T]
      }
  }
}
 
// 第二步
// 包名:org.apache.spark.util
// 類名:Utils
// scalastyle:off classforname
/** Preferred alternative to Class.forName(className) */
def classForName(className: String): Class[_] = {
  Class.forName(className, true, getContextOrSparkClassLoader)
  // scalastyle:on classforname
}
 
/**
 * Get the Context ClassLoader on this thread or, if not present, the ClassLoader that
 * loaded Spark.
 *
 * This should be used whenever passing a ClassLoader to Class.ForName or finding the currently
 * active loader when setting up ClassLoader delegation chains.
 */
def getContextOrSparkClassLoader: ClassLoader =
  Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)
 
/**
 * Get the ClassLoader which loaded Spark.
 */
def getSparkClassLoader: ClassLoader = getClass.getClassLoader

3.7 建立記憶體管理器 MemoryManager

MemoryManager 的主要實現有 StaticMemoryManager 和 UnifiedMemoryManager。其中 StaticMemoryManager 是 Spark 早期版本遺留下來的記憶體管理器實現,可以配置 spark.memory.useLegacyMode 屬性來指定,該屬性預設為 false,因此預設的記憶體管理器是 UnifiedMemoryManager;而UnifiedMemoryManager 是在Spark1.6中增加了一個新的記憶體管理模型,該模型可以使得execution部分和storage部分的記憶體不像之前的(StaticMemoryManager)由比例引數限定住,而是兩者可以互相借用空閒的記憶體。

建立 MemoryManager 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
  if (useLegacyMemoryManager) {
    new StaticMemoryManager(conf, numUsableCores)
  } else {
    UnifiedMemoryManager(conf, numUsableCores)
  }

3.8 建立塊傳輸服務 NettyBlockTransferService

在Spark1.6中只保留了NettyBlockTransferService,已經沒有了NioBlockTransferService。NettyBlockTransferService使用Netty提供的非同步事件驅動的網路應用框架,提供Web服務及客戶端,獲取遠端節點上Block的集合。 在這裡使用的是 BlockTransferService 的子類 NettyBlockTransferService建立塊傳輸服務 BlockTransferService,NettyBlockTransferService 將提供對外的塊傳輸服務。也正是因為 MapOutputTracker 與 NettyBlockTransferService 的配合,才實現了 Spark 的 Shuffle。

建立 BlockTransferManager 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val blockManagerPort = if (isDriver) {
  conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
  conf.get(BLOCK_MANAGER_PORT)
}
 
val blockTransferService =
  new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
    blockManagerPort, numUsableCores)

3.9 建立 BlockManagerMaster

BlockManagerMaster 負責對BlockManager的管理和協調,具體操作依賴於BlockManagerMasterEndpoint。

建立 BlockManagerMaster 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
  BlockManagerMaster.DRIVER_ENDPOINT_NAME,
  new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
  conf, isDriver)
 
 
// 第二步
// 包名:org.apache.spark
// 類名:SparkEnv
def registerOrLookupEndpoint(
    name: String, endpointCreator: => RpcEndpoint):
  RpcEndpointRef = {
  if (isDriver) {
    logInfo("Registering " + name)
    rpcEnv.setupEndpoint(name, endpointCreator)
  } else {
    RpcUtils.makeDriverRef(name, conf, rpcEnv)
  }
}
  
// 第三步
// 包名:org.apache.spark.rpc.netty
// 類名:NettyRpcEnv
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  dispatcher.registerRpcEndpoint(name, endpoint)
}
  
// 包名:org.apache.spark.util
// 類名:RpcUtils
/**
 * Retrieve a `RpcEndpointRef` which is located in the driver via its name.
 */
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
  val driverHost: String = conf.get("spark.driver.host", "localhost")
  val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  Utils.checkHost(driverHost)
  rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

圖4 為 Driver 建立塊管理的流程

 圖5 為 Executor 建立塊管理的流程

這裡通過 registerOrLookupEndpoint 方法查詢或者註冊 BlockManagerMasterEndpoint,而對Driver和Executor處理BlockManagerMaster的方式不同:

  • 當前應用程式是 Driver,則建立 BlockManagerMasterEndpoint,並且註冊到 Dispatcher 中,註冊名為 BlockManagerMaster;
  • 當前應用程式是 Executor,則從遠端 Driver 例項的 NettyRpcEnv 的 Dispatcher 中查詢 BlockManagerMasterEndpoint 的引用。

無論是 Driver 還是 Executor,最後都由 BlockManagerMaster 的屬性 driverEndpoint 持有 BlockManagerMasterEndpoint 的引用。

提示:這裡的BlockManagerMaster 的建立邏輯與 MapOutputTracker 基本一致,可以互相對照著分析,能更好理解 Spark RPC 服務。

3.10 建立塊管理器 BlockManager

BlockManager負責對Block的管理,只有在BlockManager的初始化方法initialize()被呼叫後才是有效的。

建立 BlockManager 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
  serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
  blockTransferService, securityManager, numUsableCores)
 
// 變數處理
// 包名:org.apache.spark
// 類名:SparkContext
_env.blockManager.initialize(_applicationId)

BlockManager 物件在 SparkContext 初始化建立 SparkEnv 執行環境被建立,而在 SparkContext 後續的初始化過程中呼叫其initialize()完成其初始化。

3.11 建立測量系統 MetricsSystem

MetricsSystem 是Spark的測量系統,在 SparkEnv 中,度量系統也是必不可少的一個子元件。

建立 MetricsSystem 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val metricsSystem = if (isDriver) {
  // Don't start metrics system right now for Driver.
  // We need to wait for the task scheduler to give us an app ID.
  // Then we can start the metrics system.
  MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
  // We need to set the executor ID before the MetricsSystem is created because sources and
  // sinks specified in the metrics configuration file will want to incorporate this executor's
  // ID into the metrics they report.
  conf.set("spark.executor.id", executorId)
  val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
  ms.start()
  ms
}
 
// 變數處理
// 第一步
// 包名:org.apache.spark.metrics
// 類名:MetricsSystem
def createMetricsSystem(
    instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = {
  new MetricsSystem(instance, conf, securityMgr)
}
 
// 第二步(當是 Driver 時)
// 包名:org.apache.spark
// 類名:SparkContext
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

根據程式碼描述,可以看出建立度量系統根據當前例項是 Driver 還是 Executor 有所區別:

  • 當前例項為 Driver:建立度量系統,並且指定度量系統的例項名為 driver。此時雖然建立了,但是並未啟動,目的是等待 SparkContext 中的任務排程器 TaskScheculer 告訴度量系統應用程式ID後再啟動。
  • 當前例項為 Executor:設定spark.executor.id屬性為當前 Executor 的ID,然後再建立並啟動度量系統。

建立度量系統使用了伴生物件 MetricsSystem 的 createMetricsSystem 方法(類似 Java 的靜態方法)

3.12 建立 OutputCommitCoordinator

當 Spark 應用程式使用了 Spark SQL (包括 Hive)或者需要將任務的輸出儲存到 HDFS 時,就會用到輸出提交協調器 OutputCommitCoordinator,OutputCommitCoordinator 將決定任務是否可以提交輸出到 HDFS。無論是 Driver 還是 Executor,在 SparkEnv 中都包含了子元件 OutputCommitCoordinator。在 Driver 上註冊了 OutputCommitCoordinatorEndpoint,在所有 Executor 上的 OutputCommitCoordinator 都是通過 OutputCommitCoordinatorEndpoint 的 RpcEndpointRef 來詢問 Driver 上的 OutputCommitCoordinator,是否能夠將輸出提交到 HDFS。

建立 OutputCommitCoordinator 的程式碼:

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
  new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
  new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
 
// 第二步
// 包名:org.apache.spark
// 類名:SparkEnv
def registerOrLookupEndpoint(
    name: String, endpointCreator: => RpcEndpoint):
  RpcEndpointRef = {
  if (isDriver) {
    logInfo("Registering " + name)
    rpcEnv.setupEndpoint(name, endpointCreator)
  } else {
    RpcUtils.makeDriverRef(name, conf, rpcEnv)
  }
}
  
// 第三步
// 包名:org.apache.spark.rpc.netty
// 類名:NettyRpcEnv
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  dispatcher.registerRpcEndpoint(name, endpoint)
}
  
// 包名:org.apache.spark.util
// 類名:RpcUtils
/**
 * Retrieve a `RpcEndpointRef` which is located in the driver via its name.
 */
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
  val driverHost: String = conf.get("spark.driver.host", "localhost")
  val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  Utils.checkHost(driverHost)
  rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

根據程式碼可以看出 OutputCommitCoordinator 的建立步驟如下:

  • 當前例項為 Driver 時,則建立 OutputCommitCoordinatorEndpoint,並且註冊到 Dispatcher 中,註冊名為 OutputCommitCoordinator;
  • 當前例項為 Executor 時,則從遠端 Driver 例項的 NettyRpcEnv 的 Dispatcher 中查詢 OutputCommitCoordinatorEndpoint 的引用。

無論是 Driver 還是 Executor,最後都由 OutputCommitCoordinator 的屬性 coordinatorRef 持有 OutputCommitCoordinatorEndpoint 的引用。

提示:這裡的BlockManagerMaster 的建立邏輯與 MapOutputTracker 基本一致,可以互相對照著分析,能更好理解 Spark RPC 服務。

3.13 建立 SparkEnv

當 SparkEnv 內的所有元件都例項化完畢,將正式構建 SparkEnv。

// 變數宣告
// 包名:org.apache.spark
// 類名:SparkEnv
val envInstance = new SparkEnv(
  executorId,
  rpcEnv,
  serializer,
  closureSerializer,
  serializerManager,
  mapOutputTracker,
  shuffleManager,
  broadcastManager,
  blockManager,
  securityManager,
  metricsSystem,
  memoryManager,
  outputCommitCoordinator,
  conf)

如果當前例項為 Driver 時,還要為其建立臨時目錄,相關程式碼如下:

// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
  val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
  envInstance.driverTmpDir = Some(sparkFilesDir)
}

從上面的註釋可以看出,當 Driver 呼叫 stop() 函式停止時,這些建立的臨時目錄將會被刪除。但是當一個 SparkContext 例項停止時,則不會被刪除,因為 Driver 是作為一個服務執行的,因此將會建立很多的臨時目錄。

參考文獻:

  • 深入理解 Spark - 核心思想與原始碼分析 @耿嘉安
  • Spark 核心設計的藝術 - 架構設計與實現 @耿嘉安
  • Spark 大資料處理 - 技術、應用與效能優化 @高彥傑
  • 圖解 Spark 核心技術與案例實戰 @郭景瞻
  • Spark 技術內幕 - 深入解析 Spark 核心、架構設計與實現原理 @張安站

相關推薦

Spark 核心-SparkEnv

本章內容: 1、功能概述 SparkEnv是Spark的執行環境物件,其中包括與眾多Executor執行相關的物件。Spark 對任務的計算都依託於 Executor 的能力,所有的 Executor 都有自己的 Spark 的執行環境 SparkEnv。有了 SparkEnv,就可以將資料儲存在儲存體系

Spark 核心-SparkContext

Spark 核心篇-SparkContext 閱讀目錄 1、功能描述 2、相關元件 3、程式碼分析 3.1 初始設定 3.2 建立執行環境 SparkEnv 3.3 建立 SparkUI 3.4 Hadoop 相關配置

Spark---Spark中yarn模式兩種提交任務方式

方式 div -s and clas client 命令 yarn 模式 一、前述 Spark可以和Yarn整合,將Application提交到Yarn上運行,和StandAlone提交模式一樣,Yarn也有兩種提交任務的方式。 二、具體 1、yarn

Spark核心RDD、什麽是RDD、RDD的屬性、創建RDD、RDD的依賴以及緩存、

ase 數組 依據 shuff esc 從數據 目錄 ordering 存儲 1:什麽是Spark的RDD??? RDD(Resilient Distributed Dataset)叫做分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裏面的元素

Spark核心概念理解

固定 tex 入口 HA 持久 其它 ota amd 適合 本文主要內容來自於《Hadoop權威指南》英文版中的Spark章節,能夠說是個人的翻譯版本號,涵蓋了基本的

Spark筆記整理(二):RDD與spark核心概念名詞

大數據 Spark [TOC] Spark RDD 非常基本的說明,下面一張圖就能夠有基本的理解: Spark RDD基本說明 1、Spark的核心概念是RDD (resilient distributed dataset,彈性分布式數據集),指的是一個只讀的,可分區的分布式數據集,這個數據集的全

Spark核心RDD:combineByKey函數詳解

sta 3.0 vbscript map ner 初始化 partition 得到 new https://blog.csdn.net/jiangpeng59/article/details/52538254 為什麽單獨講解combineByKey? 因為comb

大資料之Spark(三)--- Spark核心API,Spark術語,Spark三級排程流程原始碼分析

一、Spark核心API ----------------------------------------------- [SparkContext] 連線到spark叢集,入口點. [HadoopRDD] extends RDD 讀取hadoop

Spark學習之Spark核心

一、Spark中的一些專業術語 任務: Application:使用者寫的應用程式,包括Driver Program和Executor Program。 Job:一個action類運算元觸發執行的操作。 stage:一組任務(task)就是一個stage。

Spark2.2.0叢集搭建部署之【SPARK叢集

軟體準備資訊,詳見Spark2.2.0叢集搭建部署之【軟體準備篇】 基礎配置資訊,詳見Spark2.2.0叢集搭建部署之【基礎配置篇】 SSH無密訪問,詳見park2.2.0叢集搭建部署之【無密訪問篇】 HADOOP叢集,詳見Spark2.2.0叢集搭建部署之【HADOOP叢集篇】

spark核心程式設計,spark基本工作原理與RDD

Spark2.0筆記 spark核心程式設計,spark基本工作原理與RDD 1. Spark基本工作原理 2. RDD以及其特點 3. 什麼是Spark開發 1.Spark基本工作原理 2. RDD以及其特點 3. 什麼是Spark開發 spark核心程

Spark核心原始碼】SparkContext一些方法的解讀

目錄 建立SchedulerBackend的TaskScheduler方法 設定並啟動事件匯流排 釋出環境更新的方法 釋出應用程式系統的方法 在【Spark核心原始碼】SparkContext中的元件和初始化 已經介紹了Spark初始化時是如何執行的,都建立了哪些元件。這些元

Spark核心原始碼】SparkContext中的元件和初始化

目錄 SparkContext概述 SparkContext元件概述 SparkContext初始化過程 第一步:確保當前執行緒中沒有SparkContext在執行 第二步:版本反饋 第三步:真正的初始化 第四步:確認啟動成功 SparkContext概述 在

Spark核心原始碼】事件匯流排ListenerBus

目錄 訊息匯流排ListenerBus 非同步事件處理LiveListenerBus 增加事件 listenerThread處理事件 訊息匯流排ListenerBus org.apache.spark.util.ListenerBus處理來自DAGScheduler、Sp

Spark核心原始碼】內建的RPC框架,Spark的通訊兵(二)

目錄 RPC管道處理TransportChannelHandler RPC服務端處理RpcHandler 載入程式Bootstrap RPC客戶端TransportClient 總結 接著【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一) 接著分析 R

Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一)

目錄 RPC上下文TransportContext RPC配置TransportConf RPC客戶端工廠TransprotClientFactory RPC服務端TransportServer 作為一個分散式計算引擎,既然是分散式,那麼網路通訊是肯定少不了的,在Spark中

Spark核心原始碼】SparkConf,Spark的配置管控

目錄 概述 從系統中獲取並設定配置資訊 使用SparkConf提供的方法設定配置資訊 通過克隆的方式設定配置資訊  總結 概述 SparkConf,以KEY-VALUE對的形式設定Spark的配置引數。我們編寫Spark應用程式時,也會先建立SparkCon

Spark核心原始碼】Spark基本概念及特點

目錄 Hadoop MapReduce的不足 Spark的基本概念 RDD DAG Partition NarrowDependency ShuffleDependency Job Stage Task Shuffle Spark的基本元件 Clu

Spark核心原始碼】解析“spark-shell”(二)

接著【初探Spark核心】解析“spark-shell”(一)來看 根據main的執行日誌來看,我們直接看一下org.apache.spark.repl.Main.main方法: main方法中建立了SparkILoop物件,作為引數傳遞給了doMain方法,並呼叫了doMain

Spark核心原始碼】Spark原始碼環境搭建

目錄 準備條件 下載spark原始碼,並解壓 開啟spark原始碼下的pom.xml檔案,修改對應的java和intellij裡的maven版本 開啟intellij,Inport Project,將原始碼匯入intellij中 問題總結(十分重要) Maven編譯打包前的準