1. 程式人生 > >Spark1.6-----原始碼解讀之SparkEnv

Spark1.6-----原始碼解讀之SparkEnv

我是跟著原始碼點進去一步一寫的,所以觀看時希望大家能一步一跟著原始碼走,不要只看博文。

在SparkContext 284行建立SparkEnv:

  // This function allows components created by SparkEnv to be mocked in unit tests:
  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
  }

SparkEnv.createDriverEnv最終會呼叫SparkEnv 233行create:

  /**
   * Helper method to create a SparkEnv for a driver or an executor.
   */
  private def create(
      conf: SparkConf,
      executorId: String,
      hostname: String,
      port: Int,
      isDriver: Boolean,
      isLocal: Boolean,
      numUsableCores: Int,
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

該方法會構造如下幾個重要的成員:

    ①分散式訊息系統RpcEnv

    ②mapOutputTracker

    ③ShuffleManager

    ④BlockManager

一一解答:

2,RpcEnv構建

個人認為,如果把分散式系統(HDFS, HBASE,SPARK等)比作一個人,那麼RPC可以認為是人體的血液迴圈系統。它將系統中各個不同的元件(如Hbase中的master, Regionserver, client)聯絡了起來。同樣,在spark中,不同元件像driver,executor,worker,master(stanalone模式)之間的通訊也是基於RPC來實現的。

Spark 1.6之前,spark的RPC是基於Akaa來實現的。Akka是一個基於scala語言的非同步的訊息框架。Spark1.6後,spark借鑑Akka的設計自己實現了一個基於Netty的rpc框架。大概的原因是1.6之前,RPC通過Akka來實現,而大檔案是基於netty來實現的,加之akka版本相容性問題,所以1.6之後把Akka改掉了,具體jira見(https://issues.apache.org/jira/browse/SPARK-5293)。

本文主要對spark1.6之後基於netty新開發的rpc框架做一個較為深入的分析。

2.1整體架構

spark 基於netty新的rpc框架借鑑了Akka的中的設計,它是基於Actor模型,各個元件可以認為是一個個獨立的實體,各個實體之間通過訊息來進行通訊。具體各個元件之間的關係圖如下(圖片來自[1]):

2.2 RpcEndpoint

表示一個個需要通訊的個體(如master,worker,driver),主要根據接收的訊息來進行對應的處理。一個RpcEndpoint經歷的過程依次是:構建->onStart→receive→onStop。其中onStart在接收任務訊息前呼叫,receive和receiveAndReply分別用來接收另一個RpcEndpoint(也可以是本身)send和ask過來的訊息。

2.3 RpcEndpointRef

RpcEndpointRef是對遠端RpcEndpoint的一個引用。當我們需要向一個具體的RpcEndpoint傳送訊息時,一般我們需要獲取到該RpcEndpoint的引用,然後通過該應用傳送訊息。

2.4 RpcAddress

表示遠端的RpcEndpointRef的地址,Host + Port。

2.5 RpcEnv

RpcEnv為RpcEndpoint提供處理訊息的環境。RpcEnv負責RpcEndpoint整個生命週期的管理,包括:註冊endpoint,endpoint之間訊息的路由,以及停止endpoint。

SparkEnv 253行create建立RpcEnv:

    val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
      clientMode = !isDriver)

呼叫RpcEnv 53行getRpcEnvFactory:

    getRpcEnvFactory(conf).create(config)

 解讀:同過RpcEnvFactory建立RpcEnv

RpcEnv 35行getRpcEnvFactory具體實現:

  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
    val rpcEnvNames = Map(
      "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
      "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc", "netty")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  }

解讀:根據配置建立對應的rpc框架 ,預設為netty,通過反射建立RpcEnvFactory。

3, ActorySystem

SparkEnv 265行:

        // Create a ActorSystem for legacy codes
        AkkaUtils.createActorSystem(
          actorSystemName + "ActorSystem",
          hostname,
          actorSystemPort,
          conf,
          securityManager
        )._1

AkkaUtils 53行:

doCreateActorSystem(name, host, actualPort, conf, securityManager)

最終在AkkaUtiles 121行建立actorSystem:

   val actorSystem = ActorSystem(name, akkaConf)

解析 :因為不是很重要就沒有解讀了。

4, mapOutputTracker建立

mapoutputTracker用於跟蹤map的輸出狀態,便於reduce獲取。每個map或者reduce任務都會有唯一任務id。每個reduce任務的輸入可能有多個map輸出,reduce會到各個map任務的節點拉取Block這個過程叫shuffle,每個shuffle過程都有一個唯一標識shuffleId。

SparkEnv  328行建立mapOutputTracker:

    //根據isDriver建立不同的物件。
    val mapOutputTracker = if (isDriver) {
      new MapOutputTrackerMaster(conf)
    } else {
      new MapOutputTrackerWorker(conf)
    }

先看MapOutputTrackerMaster。

MapOutputTrackerMaster 273行 構造器:

private[spark] class MapOutputTrackerMaster(conf: SparkConf)
  extends MapOutputTracker(conf) {

比較重要的成員物件 MapOutputTracker 299行:mapStatuses,cachedSerializedStatuses 

//key為shuffleId,Array中的MapStatus跟蹤各個map任務的輸出狀態,Mapstatus中維護的了map輸出
//block的地址BolckMangerId,所以reduce知道從何處獲得map任務的輸出狀態。
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
//維護了序列化後map任務的輸出狀態。
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

MapOutputTrackerWorker:

/**
 * MapOutputTracker for the executors, which fetches map output information from the driver's
 * MapOutputTrackerMaster.
 */
//從driver端獲取對應的map output資訊
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
  protected val mapStatuses: Map[Int, Array[MapStatus]] =
    new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
}

回到SparkEnv

SparkEnv 334行構建MapOutputTrackerMasterEndpoint並獲取MapOutputTrackerMasterEndpoint引用:

    // Have to assign trackerActor after initialization as MapOutputTrackerActor
    // requires the MapOutputTracker itself
    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

SparkEnv 317行registerOrLookupEndpoint具體實現:

    def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
      RpcEndpointRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
      } else {
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
      }
    }

解析:如果是Driver會建立MapOutputTrackerMasterEndpoint 然後獲得MapOutputTrackerMasterEndpoint的引用RpcEndpointRef,

Executor直接獲得MapOutputTrackerMasterEndpoint的引用RpcEndpointRef。

RpcEndpointRef可以理解為物件的引用。

map任務的狀態是由Executor是向MapOutputTrackerMasterEndpoint傳送訊息來將map狀態同步到MapStatus中的。registerOrLookupEndpoint獲得的引用就是找到MapOutputTrackerMasterEndpoint的關鍵。

5, ShuffleManager

SparkEnv 341:

    val shortShuffleMgrNames = Map(
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
      "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

解讀:ShuffleManager負責管理本地和遠端的shuffle操作的。使用反射的方式生成的(預設)SortShuffleManager。

6  BlockManger

SparkEnv 364:

    // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

解讀:BlockManager負責對Block管理。

總結

建立了幾個很重要的成員。

用於進行訊息傳遞的RpcEnv

用於負責跟蹤map輸出的mapOutputTracker

另外兩個以後的部落格會講