Spark1.6-----原始碼解讀之SparkEnv
我是跟著原始碼點進去一步一寫的,所以觀看時希望大家能一步一跟著原始碼走,不要只看博文。
在SparkContext 284行建立SparkEnv:
// This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) }
SparkEnv.createDriverEnv最終會呼叫SparkEnv 233行create:
/** * Helper method to create a SparkEnv for a driver or an executor. */ private def create( conf: SparkConf, executorId: String, hostname: String, port: Int, isDriver: Boolean, isLocal: Boolean, numUsableCores: Int, listenerBus: LiveListenerBus = null, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
該方法會構造如下幾個重要的成員:
①分散式訊息系統RpcEnv
②mapOutputTracker
③ShuffleManager
④BlockManager
一一解答:
2,RpcEnv構建
個人認為,如果把分散式系統(HDFS, HBASE,SPARK等)比作一個人,那麼RPC可以認為是人體的血液迴圈系統。它將系統中各個不同的元件(如Hbase中的master, Regionserver, client)聯絡了起來。同樣,在spark中,不同元件像driver,executor,worker,master(stanalone模式)之間的通訊也是基於RPC來實現的。
Spark 1.6之前,spark的RPC是基於Akaa來實現的。Akka是一個基於scala語言的非同步的訊息框架。Spark1.6後,spark借鑑Akka的設計自己實現了一個基於Netty的rpc框架。大概的原因是1.6之前,RPC通過Akka來實現,而大檔案是基於netty來實現的,加之akka版本相容性問題,所以1.6之後把Akka改掉了,具體jira見(https://issues.apache.org/jira/browse/SPARK-5293)。
本文主要對spark1.6之後基於netty新開發的rpc框架做一個較為深入的分析。
2.1整體架構
spark 基於netty新的rpc框架借鑑了Akka的中的設計,它是基於Actor模型,各個元件可以認為是一個個獨立的實體,各個實體之間通過訊息來進行通訊。具體各個元件之間的關係圖如下(圖片來自[1]):
2.2 RpcEndpoint
表示一個個需要通訊的個體(如master,worker,driver),主要根據接收的訊息來進行對應的處理。一個RpcEndpoint經歷的過程依次是:構建->onStart→receive→onStop。其中onStart在接收任務訊息前呼叫,receive和receiveAndReply分別用來接收另一個RpcEndpoint(也可以是本身)send和ask過來的訊息。
2.3 RpcEndpointRef
RpcEndpointRef是對遠端RpcEndpoint的一個引用。當我們需要向一個具體的RpcEndpoint傳送訊息時,一般我們需要獲取到該RpcEndpoint的引用,然後通過該應用傳送訊息。
2.4 RpcAddress
表示遠端的RpcEndpointRef的地址,Host + Port。
2.5 RpcEnv
RpcEnv為RpcEndpoint提供處理訊息的環境。RpcEnv負責RpcEndpoint整個生命週期的管理,包括:註冊endpoint,endpoint之間訊息的路由,以及停止endpoint。
SparkEnv 253行create建立RpcEnv:
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
呼叫RpcEnv 53行getRpcEnvFactory:
getRpcEnvFactory(conf).create(config)
解讀:同過RpcEnvFactory建立RpcEnv
RpcEnv 35行getRpcEnvFactory具體實現:
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
val rpcEnvNames = Map(
"akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
"netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "netty")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}
解讀:根據配置建立對應的rpc框架 ,預設為netty,通過反射建立RpcEnvFactory。
3, ActorySystem
SparkEnv 265行:
// Create a ActorSystem for legacy codes
AkkaUtils.createActorSystem(
actorSystemName + "ActorSystem",
hostname,
actorSystemPort,
conf,
securityManager
)._1
AkkaUtils 53行:
doCreateActorSystem(name, host, actualPort, conf, securityManager)
最終在AkkaUtiles 121行建立actorSystem:
val actorSystem = ActorSystem(name, akkaConf)
解析 :因為不是很重要就沒有解讀了。
4, mapOutputTracker建立
mapoutputTracker用於跟蹤map的輸出狀態,便於reduce獲取。每個map或者reduce任務都會有唯一任務id。每個reduce任務的輸入可能有多個map輸出,reduce會到各個map任務的節點拉取Block這個過程叫shuffle,每個shuffle過程都有一個唯一標識shuffleId。
SparkEnv 328行建立mapOutputTracker:
//根據isDriver建立不同的物件。
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
}
先看MapOutputTrackerMaster。
MapOutputTrackerMaster 273行 構造器:
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {
比較重要的成員物件 MapOutputTracker 299行:mapStatuses,cachedSerializedStatuses
//key為shuffleId,Array中的MapStatus跟蹤各個map任務的輸出狀態,Mapstatus中維護的了map輸出
//block的地址BolckMangerId,所以reduce知道從何處獲得map任務的輸出狀態。
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
//維護了序列化後map任務的輸出狀態。
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()
MapOutputTrackerWorker:
/**
* MapOutputTracker for the executors, which fetches map output information from the driver's
* MapOutputTrackerMaster.
*/
//從driver端獲取對應的map output資訊
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
protected val mapStatuses: Map[Int, Array[MapStatus]] =
new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
}
回到SparkEnv
SparkEnv 334行構建MapOutputTrackerMasterEndpoint並獲取MapOutputTrackerMasterEndpoint引用:
// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
SparkEnv 317行registerOrLookupEndpoint具體實現:
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
解析:如果是Driver會建立MapOutputTrackerMasterEndpoint 然後獲得MapOutputTrackerMasterEndpoint的引用RpcEndpointRef,
Executor直接獲得MapOutputTrackerMasterEndpoint的引用RpcEndpointRef。
RpcEndpointRef可以理解為物件的引用。
map任務的狀態是由Executor是向MapOutputTrackerMasterEndpoint傳送訊息來將map狀態同步到MapStatus中的。registerOrLookupEndpoint獲得的引用就是找到MapOutputTrackerMasterEndpoint的關鍵。
5, ShuffleManager
SparkEnv 341:
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
解讀:ShuffleManager負責管理本地和遠端的shuffle操作的。使用反射的方式生成的(預設)SortShuffleManager。
6 BlockManger
SparkEnv 364:
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
解讀:BlockManager負責對Block管理。
總結
建立了幾個很重要的成員。
用於進行訊息傳遞的RpcEnv
用於負責跟蹤map輸出的mapOutputTracker
另外兩個以後的部落格會講