Spark 核心篇-SparkEnv
本章內容:
1、功能概述
SparkEnv是Spark的執行環境物件,其中包括與眾多Executor執行相關的物件。Spark 對任務的計算都依託於 Executor 的能力,所有的 Executor 都有自己的 Spark 的執行環境 SparkEnv。有了 SparkEnv,就可以將資料儲存在儲存體系中;就能利用計算引擎對計算任務進行處理,就可以在節點間進行通訊等。在local模式下Driver會建立Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend程序中也會建立Executor,所以SparkEnv存在於Driver或者CoarseGrainedExecutorBackend程序中。
建立SparkEnv主要使用SparkEnv的createDriverEnv方法,有四個引數:conf、isLocal、listenerBus 以及在本地模式下driver執行executor需要的numberCores。
/** * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a global variable, so all the threads can access the same * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). * * NOTE: This is not intended for external use. This is exposed for Shark and may be made private * in a future release. */ @DeveloperApi class SparkEnv ( val executorId: String, private[spark] val rpcEnv: RpcEnv, val serializer: Serializer, val closureSerializer: Serializer, val serializerManager: SerializerManager, val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, val memoryManager: MemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging
圖1 在 Driver 上建立 SparkEnv
圖2 在 Executor 上建立 SparkEnv
2、相關元件
名稱 | 說明 |
SecurityManager | 主要對賬戶、許可權及身份認證進行設定與管理。 |
RpcEnv | 各個元件之間通訊的執行環境。 |
SerializerManager | Spark 中很多物件在通用網路傳輸或者寫入儲存體系時,都需要序列化。 |
BroadcastManager | 用於將配置資訊和序列化後的RDD、Job以及ShuffleDependency等資訊在本地儲存。 |
MapOutputTracker | 用於跟蹤Map階段任務的輸出狀態,此狀態便於Reduce階段任務獲取地址及中間結果。 |
ShuffleManager | 負責管理本地及遠端的Block資料的shuffle操作。 |
MemoryManager | 一個抽象的記憶體管理器,用於執行記憶體如何在執行和儲存之間共享。 |
NettyBlockTransferService | 使用Netty提供的非同步事件驅動的網路應用框架,提供Web服務及客戶端,獲取遠端節點上Block的集合。 |
BlockManagerMaster | 負責對BlockManager的管理和協調。 |
BlockManager | 負責對Block的管理,管理整個Spark執行時的資料讀寫的,當然也包含資料儲存本身,在這個基礎之上進行讀寫操作。 |
MetricsSystem | 一般是為了衡量系統的各種指標的度量系統。 |
OutputCommitCoordinator | 確定任務是否可以把輸出提到到HFDS的管理者,使用先提交者勝的策略。 |
3、程式碼分析
程式碼 | 說明 |
// Create the Spark execution environment (cache, map output tracker, etc) |
建立 Spark 執行時環境(包括:cache、map output tracker 等) |
// This function allows components created by SparkEnv to be mocked in unit tests: |
這個函式允許 SparkEnv 建立的元件在測試單元中被模仿。 類名:SparkContext 函式:createSparkEnv 引數:
用意:直接呼叫 SparkEnv.createDriverEnv()函式 |
/** |
為 Driver 建立一個 SparkEnv 物件 類名:SparkEnv 函式:createDriverEnv 引數:
用意:做了 HOST和 PORT 判斷,然後呼叫 create()函式 |
/** |
為 Driver 和 Executor 建立一個 SparkEnv 的 Helper 方法 類名:SparkEnv 函式:create 用意:
|
3.1 建立安全管理器 SecurityManager
SecurityManager主要對帳號、許可權以及身份認證進行設定和管理。如果 Spark 的部署模式為 YARN,則需要生成 secret key (金鑰)並存儲 Hadoop UGI。而在其他模式下,則需要設定環境變數 _SPARK_AUTH_SECRET(優先順序更高)或者 spark.authenticate.secret 屬性指定 secret key (金鑰)。最後SecurityManager 中設定了預設的口令認證例項 Authenticator,此例項採用匿名內部類實現,用於每次使用 HTTP client 從 HTTP 伺服器獲取使用者的使用者和密碼。這是由於 Spark 的節點間通訊往往需要動態協商使用者名稱、密碼,這種方式靈活地支援了這種需求。
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val securityManager = new SecurityManager(conf, ioEncryptionKey) if (isDriver) { securityManager.initializeAuth() } // 變數處理 // 第一步:new SecurityManager() // 包名:org.apache.spark // 類名:SecurityManager // 使用 HTTP 連結設定口令認證 // Set our own authenticator to properly negotiate(協商/達成) user/password for HTTP connections. // This is needed by the HTTP client fetching from the HttpServer. Put here so its // only set once. if (authOn) { Authenticator.setDefault( // 建立口令認證例項,複寫PasswordAuthentication方法,獲得使用者名稱和密碼 new Authenticator() { override def getPasswordAuthentication(): PasswordAuthentication = { var passAuth: PasswordAuthentication = null val userInfo = getRequestingURL().getUserInfo() if (userInfo != null) { val parts = userInfo.split(":", 2) passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray()) } return passAuth } } ) } // 第二步:initializeAuth() // 包名:org.apache.spark // 類名:SecurityManager /** * Initialize the authentication secret. * * If authentication is disabled, do nothing. * * In YARN mode, generate a new secret and store it in the current user's credentials. * * In other modes, assert that the auth secret is set in the configuration. */ def initializeAuth(): Unit = { if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { return } if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") { require(sparkConf.contains(SPARK_AUTH_SECRET_CONF), s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.") return } val rnd = new SecureRandom() val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE val secretBytes = new Array[Byte](length) rnd.nextBytes(secretBytes) val creds = new Credentials() val secretStr = HashCodes.fromBytes(secretBytes).toString() creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8)) UserGroupInformation.getCurrentUser().addCredentials(creds) }
3.2 建立 RPC 通訊層 RpcEnv
Spark1.6推出的RpcEnv、RpcEndPoint、RpcEndpointRef為核心的新型架構下的RPC通訊方式,在底層封裝了Akka和Netty,為未來擴充更多的通訊系統提供了可能。RpcEnv是RPC的環境,所有的RpcEndpoint都需要註冊到RpcEnv例項物件中,管理著這些註冊的RpcEndpoint的生命週期:
- 根據name或者uri註冊RpcEndpoint;
- 管理各種訊息的處理;
- 停止RpcEndpoint
Spark RPC中最為重要的三個抽象(“三劍客”)為:RpcEnv、RpcEndpoint、RpcEndpointRef,這樣做的好處有:
- 對上層的API來說,遮蔽了底層的具體實現,使用方便
- 可以通過不同的實現來完成指定的功能,方便擴充套件
- 促進了底層實現層的良性競爭,Spark 1.6.3中預設使用了Netty作為底層的實現,但Akka的依賴依然存在;而Spark 2.1.0中的底層實現只有Netty,這樣使用者可以方便的使用不同版本的Akka或者將來某種更好的底層實現
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv private[spark] val driverSystemName = "sparkDriver" private[spark] val executorSystemName = "sparkExecutor" val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER val systemName = if (isDriver) driverSystemName else executorSystemName val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf, securityManager, numUsableCores, !isDriver) // 變數處理 // 第一步 // 包名:org.apache.spark.rpc // 類名:RpcEnv def create( name: String, bindAddress: String, advertiseAddress: String, port: Int, conf: SparkConf, securityManager: SecurityManager, numUsableCores: Int, clientMode: Boolean): RpcEnv = { val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, numUsableCores, clientMode) new NettyRpcEnvFactory().create(config) } // 第二步 // 包名:org.apache.spark.rpc.netty // 類名:NettyRpcEnv def create(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager, config.numUsableCores) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv }
3.3 建立序列化管理器 SerializerManager
Spark 中很多物件在通用網路傳輸或者寫入儲存體系時,都需要序列化。SparkEnv 中有兩個序列化元件,分別是SerializerManager和ClosureSerializer。
建立 SparkEnv 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv // Create an instance of the class named by the given SparkConf property, or defaultClassName // if the property is not set, possibly initializing it with our conf def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { instantiateClass[T](conf.get(propertyName, defaultClassName)) } val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) val closureSerializer = new JavaSerializer(conf)
可以看到這裡建立的serializer預設為org.apache.spark.serializer.JavaSerializer,使用者可以通過spark.serializer屬性配置其他的序列化實現,如org.apache.spark.serializer.KryoSerializer。而 closureSerializer 的實際型別固定為org.apache.spark.serializer.JavaSerializer,使用者不能夠自己指定。JavaSerializer採用 Java 語言自帶的序列化 API 實現。
3.4 建立廣播管理器 BroadcastManager
BroadcastManager用於將配置資訊和序列化後的RDD、Job以及ShuffleDependency等資訊在本地儲存。如果為了容災,也會複製到其他節點上。
圖3 向 Executor 廣播一個變數
建立 BroadcastManager 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) // 變數處理 // 包名:org.apache.spark.broadcast // 類名:BroadcastManager initialize() // Called by SparkContext or Executor before using Broadcast private def initialize() { synchronized { if (!initialized) { broadcastFactory = new TorrentBroadcastFactory broadcastFactory.initialize(isDriver, conf, securityManager) initialized = true } } }
BroadcastManager 在其初始化的過程中就會呼叫自身的 initialize 方法,當 initialize 執行完畢,BroadcastManager 就會正式生效。
3.5 建立 Map 任務輸出跟蹤器 MapOutputTracker
MapOutputTracker 用於跟蹤Map階段任務的輸出狀態,此狀態便於Reduce階段任務獲取地址及中間結果。每個Map任務或者Reduce任務都會有其唯一的標識,分別為mapId 和 reduceId。每個Reduce任務的輸入可能是多個Map任務的輸出,Reduce會到各個Map任務的所在節點上拉取Block,這一過程叫做Shuffle。每個Shuffle過程都有唯一的表示shuffleId。
MapOutputTracker 有兩個子類:MapOutputTrackerMaster(for driver) 和 MapOutputTrackerWorker(for executors);因為它們使用了不同的HashMap來儲存元資料。
建立 MapOutputTracker 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf, broadcastManager, isLocal) } else { new MapOutputTrackerWorker(conf) } // 變數處理 // 第一步 // 包名:org.apache.spark // 類名:SparkEnv // MapOutputTracker.ENDPOINT_NAME 變數宣告為 val ENDPOINT_NAME = "MapOutputTracker" // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint // requires the MapOutputTracker itself mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) // 第二步 // 包名:org.apache.spark // 類名:SparkEnv def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } // 第三步 // 包名:org.apache.spark.rpc.netty // 類名:NettyRpcEnv override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) } // 包名:org.apache.spark.util // 類名:RpcUtils /** * Retrieve a `RpcEndpointRef` which is located in the driver via its name. */ def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost) rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name) }
在 MapOutputTracker 初始化中,可以看到針對當前例項是 Driver 還是 Executor,建立其方式有所不同。
- 如果當前應用程式是 Driver,則建立 MapOutputTrackerMaster,然後建立 MapOutputTrackerMasterEndpoint,並且註冊到 Dispatcher 中,註冊名為 MapOutputTracker;
- 如果當前應用程式是 Executor,則建立 MapOutputTrackerWorker,並從遠端 Driver 例項的 NettyRpcEnv 的 Dispatcher 中查詢 MapOutputTrackerMasterEndpoint 的引用。
無論是 Driver 還是 Executor,最後都由 MapOutputTracker 的屬性 trackerEndpoint 持有 MapOutputTrackerEndpoint 的引用。
3.6 建立 ShuffleManager
ShuffleManager負責管理本地及遠端的Block資料的shuffle操作。ShuffleManager根據預設的 spark.shuffle.manager 屬性,通過反射方式生成的SortShuffleManager的例項。預設使用的是sort模式的SortShuffleManager,Spark 2.x.x 版本提供 sort 和 tungsten-sort 兩種 ShuffleManager 的實現。無論是 sort 還是 tungsten-sort,我們看到實現類都是 SortShuffleManager。
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) // 變數處理 // 第一步 // 包名:org.apache.spark // 類名:SparkEnv // Create an instance of the class with the given name, possibly initializing it with our conf def instantiateClass[T](className: String): T = { val cls = Utils.classForName(className) // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just // SparkConf, then one taking no arguments try { cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE) .newInstance(conf, new java.lang.Boolean(isDriver)) .asInstanceOf[T] } catch { case _: NoSuchMethodException => try { cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T] } catch { case _: NoSuchMethodException => cls.getConstructor().newInstance().asInstanceOf[T] } } } // 第二步 // 包名:org.apache.spark.util // 類名:Utils // scalastyle:off classforname /** Preferred alternative to Class.forName(className) */ def classForName(className: String): Class[_] = { Class.forName(className, true, getContextOrSparkClassLoader) // scalastyle:on classforname } /** * Get the Context ClassLoader on this thread or, if not present, the ClassLoader that * loaded Spark. * * This should be used whenever passing a ClassLoader to Class.ForName or finding the currently * active loader when setting up ClassLoader delegation chains. */ def getContextOrSparkClassLoader: ClassLoader = Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader) /** * Get the ClassLoader which loaded Spark. */ def getSparkClassLoader: ClassLoader = getClass.getClassLoader
3.7 建立記憶體管理器 MemoryManager
MemoryManager 的主要實現有 StaticMemoryManager 和 UnifiedMemoryManager。其中 StaticMemoryManager 是 Spark 早期版本遺留下來的記憶體管理器實現,可以配置 spark.memory.useLegacyMode 屬性來指定,該屬性預設為 false,因此預設的記憶體管理器是 UnifiedMemoryManager;而UnifiedMemoryManager 是在Spark1.6中增加了一個新的記憶體管理模型,該模型可以使得execution部分和storage部分的記憶體不像之前的(StaticMemoryManager)由比例引數限定住,而是兩者可以互相借用空閒的記憶體。
建立 MemoryManager 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { UnifiedMemoryManager(conf, numUsableCores) }
3.8 建立塊傳輸服務 NettyBlockTransferService
在Spark1.6中只保留了NettyBlockTransferService,已經沒有了NioBlockTransferService。NettyBlockTransferService使用Netty提供的非同步事件驅動的網路應用框架,提供Web服務及客戶端,獲取遠端節點上Block的集合。 在這裡使用的是 BlockTransferService 的子類 NettyBlockTransferService建立塊傳輸服務 BlockTransferService,NettyBlockTransferService 將提供對外的塊傳輸服務。也正是因為 MapOutputTracker 與 NettyBlockTransferService 的配合,才實現了 Spark 的 Shuffle。
建立 BlockTransferManager 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val blockManagerPort = if (isDriver) { conf.get(DRIVER_BLOCK_MANAGER_PORT) } else { conf.get(BLOCK_MANAGER_PORT) } val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores)
3.9 建立 BlockManagerMaster
BlockManagerMaster 負責對BlockManager的管理和協調,具體操作依賴於BlockManagerMasterEndpoint。
建立 BlockManagerMaster 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // 第二步 // 包名:org.apache.spark // 類名:SparkEnv def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } // 第三步 // 包名:org.apache.spark.rpc.netty // 類名:NettyRpcEnv override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) } // 包名:org.apache.spark.util // 類名:RpcUtils /** * Retrieve a `RpcEndpointRef` which is located in the driver via its name. */ def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost) rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name) }
圖4 為 Driver 建立塊管理的流程
圖5 為 Executor 建立塊管理的流程
這裡通過 registerOrLookupEndpoint 方法查詢或者註冊 BlockManagerMasterEndpoint,而對Driver和Executor處理BlockManagerMaster的方式不同:
- 當前應用程式是 Driver,則建立 BlockManagerMasterEndpoint,並且註冊到 Dispatcher 中,註冊名為 BlockManagerMaster;
- 當前應用程式是 Executor,則從遠端 Driver 例項的 NettyRpcEnv 的 Dispatcher 中查詢 BlockManagerMasterEndpoint 的引用。
無論是 Driver 還是 Executor,最後都由 BlockManagerMaster 的屬性 driverEndpoint 持有 BlockManagerMasterEndpoint 的引用。
提示:這裡的BlockManagerMaster 的建立邏輯與 MapOutputTracker 基本一致,可以互相對照著分析,能更好理解 Spark RPC 服務。
3.10 建立塊管理器 BlockManager
BlockManager負責對Block的管理,只有在BlockManager的初始化方法initialize()被呼叫後才是有效的。
建立 BlockManager 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) // 變數處理 // 包名:org.apache.spark // 類名:SparkContext _env.blockManager.initialize(_applicationId)
BlockManager 物件在 SparkContext 初始化建立 SparkEnv 執行環境被建立,而在 SparkContext 後續的初始化過程中呼叫其initialize()完成其初始化。
3.11 建立測量系統 MetricsSystem
MetricsSystem 是Spark的測量系統,在 SparkEnv 中,度量系統也是必不可少的一個子元件。
建立 MetricsSystem 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val metricsSystem = if (isDriver) { // Don't start metrics system right now for Driver. // We need to wait for the task scheduler to give us an app ID. // Then we can start the metrics system. MetricsSystem.createMetricsSystem("driver", conf, securityManager) } else { // We need to set the executor ID before the MetricsSystem is created because sources and // sinks specified in the metrics configuration file will want to incorporate this executor's // ID into the metrics they report. conf.set("spark.executor.id", executorId) val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) ms.start() ms } // 變數處理 // 第一步 // 包名:org.apache.spark.metrics // 類名:MetricsSystem def createMetricsSystem( instance: String, conf: SparkConf, securityMgr: SecurityManager): MetricsSystem = { new MetricsSystem(instance, conf, securityMgr) } // 第二步(當是 Driver 時) // 包名:org.apache.spark // 類名:SparkContext // The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. _env.metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
根據程式碼描述,可以看出建立度量系統根據當前例項是 Driver 還是 Executor 有所區別:
- 當前例項為 Driver:建立度量系統,並且指定度量系統的例項名為 driver。此時雖然建立了,但是並未啟動,目的是等待 SparkContext 中的任務排程器 TaskScheculer 告訴度量系統應用程式ID後再啟動。
- 當前例項為 Executor:設定spark.executor.id屬性為當前 Executor 的ID,然後再建立並啟動度量系統。
建立度量系統使用了伴生物件 MetricsSystem 的 createMetricsSystem 方法(類似 Java 的靜態方法)
3.12 建立 OutputCommitCoordinator
當 Spark 應用程式使用了 Spark SQL (包括 Hive)或者需要將任務的輸出儲存到 HDFS 時,就會用到輸出提交協調器 OutputCommitCoordinator,OutputCommitCoordinator 將決定任務是否可以提交輸出到 HDFS。無論是 Driver 還是 Executor,在 SparkEnv 中都包含了子元件 OutputCommitCoordinator。在 Driver 上註冊了 OutputCommitCoordinatorEndpoint,在所有 Executor 上的 OutputCommitCoordinator 都是通過 OutputCommitCoordinatorEndpoint 的 RpcEndpointRef 來詢問 Driver 上的 OutputCommitCoordinator,是否能夠將輸出提交到 HDFS。
建立 OutputCommitCoordinator 的程式碼:
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { new OutputCommitCoordinator(conf, isDriver) } val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef) // 第二步 // 包名:org.apache.spark // 類名:SparkEnv def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } // 第三步 // 包名:org.apache.spark.rpc.netty // 類名:NettyRpcEnv override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) } // 包名:org.apache.spark.util // 類名:RpcUtils /** * Retrieve a `RpcEndpointRef` which is located in the driver via its name. */ def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost) rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name) }
根據程式碼可以看出 OutputCommitCoordinator 的建立步驟如下:
- 當前例項為 Driver 時,則建立 OutputCommitCoordinatorEndpoint,並且註冊到 Dispatcher 中,註冊名為 OutputCommitCoordinator;
- 當前例項為 Executor 時,則從遠端 Driver 例項的 NettyRpcEnv 的 Dispatcher 中查詢 OutputCommitCoordinatorEndpoint 的引用。
無論是 Driver 還是 Executor,最後都由 OutputCommitCoordinator 的屬性 coordinatorRef 持有 OutputCommitCoordinatorEndpoint 的引用。
提示:這裡的BlockManagerMaster 的建立邏輯與 MapOutputTracker 基本一致,可以互相對照著分析,能更好理解 Spark RPC 服務。
3.13 建立 SparkEnv
當 SparkEnv 內的所有元件都例項化完畢,將正式構建 SparkEnv。
// 變數宣告 // 包名:org.apache.spark // 類名:SparkEnv val envInstance = new SparkEnv( executorId, rpcEnv, serializer, closureSerializer, serializerManager, mapOutputTracker, shuffleManager, broadcastManager, blockManager, securityManager, metricsSystem, memoryManager, outputCommitCoordinator, conf)
如果當前例項為 Driver 時,還要為其建立臨時目錄,相關程式碼如下:
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is // called, and we only need to do it for driver. Because driver may run as a service, and if we // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. if (isDriver) { val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath envInstance.driverTmpDir = Some(sparkFilesDir) }
從上面的註釋可以看出,當 Driver 呼叫 stop() 函式停止時,這些建立的臨時目錄將會被刪除。但是當一個 SparkContext 例項停止時,則不會被刪除,因為 Driver 是作為一個服務執行的,因此將會建立很多的臨時目錄。
參考文獻:
- 深入理解 Spark - 核心思想與原始碼分析 @耿嘉安
- Spark 核心設計的藝術 - 架構設計與實現 @耿嘉安
- Spark 大資料處理 - 技術、應用與效能優化 @高彥傑
- 圖解 Spark 核心技術與案例實戰 @郭景瞻
- Spark 技術內幕 - 深入解析 Spark 核心、架構設計與實現原理 @張安站
相關推薦
Spark 核心篇-SparkEnv
本章內容: 1、功能概述 SparkEnv是Spark的執行環境物件,其中包括與眾多Executor執行相關的物件。Spark 對任務的計算都依託於 Executor 的能力,所有的 Executor 都有自己的 Spark 的執行環境 SparkEnv。有了 SparkEnv,就可以將資料儲存在儲存體系
Spark 核心篇-SparkContext
Spark 核心篇-SparkContext 閱讀目錄 1、功能描述 2、相關元件 3、程式碼分析 3.1 初始設定 3.2 建立執行環境 SparkEnv 3.3 建立 SparkUI 3.4 Hadoop 相關配置
【Spark】篇---Spark中yarn模式兩種提交任務方式
方式 div -s and clas client 命令 yarn 模式 一、前述 Spark可以和Yarn整合,將Application提交到Yarn上運行,和StandAlone提交模式一樣,Yarn也有兩種提交任務的方式。 二、具體 1、yarn
Spark核心RDD、什麽是RDD、RDD的屬性、創建RDD、RDD的依賴以及緩存、
ase 數組 依據 shuff esc 從數據 目錄 ordering 存儲 1:什麽是Spark的RDD??? RDD(Resilient Distributed Dataset)叫做分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裏面的元素
Spark核心概念理解
固定 tex 入口 HA 持久 其它 ota amd 適合 本文主要內容來自於《Hadoop權威指南》英文版中的Spark章節,能夠說是個人的翻譯版本號,涵蓋了基本的
Spark筆記整理(二):RDD與spark核心概念名詞
大數據 Spark [TOC] Spark RDD 非常基本的說明,下面一張圖就能夠有基本的理解: Spark RDD基本說明 1、Spark的核心概念是RDD (resilient distributed dataset,彈性分布式數據集),指的是一個只讀的,可分區的分布式數據集,這個數據集的全
Spark核心RDD:combineByKey函數詳解
sta 3.0 vbscript map ner 初始化 partition 得到 new https://blog.csdn.net/jiangpeng59/article/details/52538254 為什麽單獨講解combineByKey? 因為comb
大資料之Spark(三)--- Spark核心API,Spark術語,Spark三級排程流程原始碼分析
一、Spark核心API ----------------------------------------------- [SparkContext] 連線到spark叢集,入口點. [HadoopRDD] extends RDD 讀取hadoop
Spark學習之Spark核心
一、Spark中的一些專業術語 任務: Application:使用者寫的應用程式,包括Driver Program和Executor Program。 Job:一個action類運算元觸發執行的操作。 stage:一組任務(task)就是一個stage。
Spark2.2.0叢集搭建部署之【SPARK叢集篇】
軟體準備資訊,詳見Spark2.2.0叢集搭建部署之【軟體準備篇】 基礎配置資訊,詳見Spark2.2.0叢集搭建部署之【基礎配置篇】 SSH無密訪問,詳見park2.2.0叢集搭建部署之【無密訪問篇】 HADOOP叢集,詳見Spark2.2.0叢集搭建部署之【HADOOP叢集篇】
spark核心程式設計,spark基本工作原理與RDD
Spark2.0筆記 spark核心程式設計,spark基本工作原理與RDD 1. Spark基本工作原理 2. RDD以及其特點 3. 什麼是Spark開發 1.Spark基本工作原理 2. RDD以及其特點 3. 什麼是Spark開發 spark核心程
【Spark核心原始碼】SparkContext一些方法的解讀
目錄 建立SchedulerBackend的TaskScheduler方法 設定並啟動事件匯流排 釋出環境更新的方法 釋出應用程式系統的方法 在【Spark核心原始碼】SparkContext中的元件和初始化 已經介紹了Spark初始化時是如何執行的,都建立了哪些元件。這些元
【Spark核心原始碼】SparkContext中的元件和初始化
目錄 SparkContext概述 SparkContext元件概述 SparkContext初始化過程 第一步:確保當前執行緒中沒有SparkContext在執行 第二步:版本反饋 第三步:真正的初始化 第四步:確認啟動成功 SparkContext概述 在
【Spark核心原始碼】事件匯流排ListenerBus
目錄 訊息匯流排ListenerBus 非同步事件處理LiveListenerBus 增加事件 listenerThread處理事件 訊息匯流排ListenerBus org.apache.spark.util.ListenerBus處理來自DAGScheduler、Sp
【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(二)
目錄 RPC管道處理TransportChannelHandler RPC服務端處理RpcHandler 載入程式Bootstrap RPC客戶端TransportClient 總結 接著【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一) 接著分析 R
【Spark核心原始碼】內建的RPC框架,Spark的通訊兵(一)
目錄 RPC上下文TransportContext RPC配置TransportConf RPC客戶端工廠TransprotClientFactory RPC服務端TransportServer 作為一個分散式計算引擎,既然是分散式,那麼網路通訊是肯定少不了的,在Spark中
【Spark核心原始碼】SparkConf,Spark的配置管控
目錄 概述 從系統中獲取並設定配置資訊 使用SparkConf提供的方法設定配置資訊 通過克隆的方式設定配置資訊 總結 概述 SparkConf,以KEY-VALUE對的形式設定Spark的配置引數。我們編寫Spark應用程式時,也會先建立SparkCon
【Spark核心原始碼】Spark基本概念及特點
目錄 Hadoop MapReduce的不足 Spark的基本概念 RDD DAG Partition NarrowDependency ShuffleDependency Job Stage Task Shuffle Spark的基本元件 Clu
【Spark核心原始碼】解析“spark-shell”(二)
接著【初探Spark核心】解析“spark-shell”(一)來看 根據main的執行日誌來看,我們直接看一下org.apache.spark.repl.Main.main方法: main方法中建立了SparkILoop物件,作為引數傳遞給了doMain方法,並呼叫了doMain
【Spark核心原始碼】Spark原始碼環境搭建
目錄 準備條件 下載spark原始碼,並解壓 開啟spark原始碼下的pom.xml檔案,修改對應的java和intellij裡的maven版本 開啟intellij,Inport Project,將原始碼匯入intellij中 問題總結(十分重要) Maven編譯打包前的準