1. 程式人生 > >Spark:BlockManager原理剖析與原始碼分析

Spark:BlockManager原理剖析與原始碼分析

BlockManager是Spark的分散式儲存系統,與我們平常說的分散式儲存系統是有區別的,區別就是這個分散式儲存系統只會管理Block塊資料,它執行在所有節點上。BlockManager的結構是Maser-Slave架構,Master就是Driver上的BlockManagerMaster,Slave就是每個Executor上的BlockManager。BlockManagerMaster負責接受Executor上的BlockManager的註冊以及管理BlockManager的元資料資訊

BlockManager 原理流程圖

在這裡插入圖片描述

儲存級別

RDD 可以使用 persist() 方法或 cache() 方法進行持久化。資料將會在第一次 action 操作時進行計算,並快取在節點的記憶體中。Spark 的快取具有容錯機制,如果一個快取的 RDD 的某個分割槽丟失了,Spark 將按照原來的計算過程,自動重新計算並進行快取。

在 shuffle 操作中(例如 reduceByKey),即便是使用者沒有呼叫 persist 方法,Spark 也會自動快取部分中間資料。這麼做的目的是,在 shuffle 的過程中某個節點執行失敗時,不需要重新計算所有的輸入資料。如果使用者想多次使用某個 RDD,強烈推薦在該 RDD 上呼叫 persist 方法。

cache()方法和persist()方法可以用來顯式地將資料儲存到記憶體或者磁碟中,其中cache方法是persist()在引數為MEMORY_ONLY時的封裝。

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    // RDD的儲存級別一旦設定了之後就不能更改
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }

儲存級別如下:

儲存級別 描述
NONE 不進行資料儲存
MEMORY_ONLY 將 RDD 以反序列化 Java 物件的形式儲存在 JVM 中。如果記憶體空間不夠,部分資料分割槽將不再快取,在每次需要用到這些資料時重新進行計算。這是預設的級別。
MEMORY_AND_DISK 將 RDD 以反序列化 Java 物件的形式儲存在 JVM 中。如果記憶體空間不夠,將未快取的資料分割槽儲存到磁碟,在需要使用這些分割槽時從磁碟讀取。
MEMORY_ONLY_SER 將 RDD 以序列化的 Java 物件的形式進行儲存(每個分割槽為一個 byte 陣列)。這種方式會比反序列化物件的方式節省很多空間,尤其是在使用 fast serializer時會節省更多的空間,但是在讀取時會增加 CPU 的計算負擔。
MEMORY_AND_DISK_SER 類似於 MEMORY_ONLY_SER ,但是溢位的分割槽會儲存到磁碟,而不是在用到它們時重新計算。
DISK_ONLY 只在磁碟上快取 RDD。
MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 與上面的級別功能相同,只不過每個分割槽在叢集中兩個節點上建立副本。
OFF_HEAP 可以將RDD儲存到分散式記憶體檔案系統中。

BlockManager 註冊過程

第一步:在建立SparkContext物件的時候就會呼叫_env.blockManager.initialize(_applicationId)建立BlockManager物件
原始碼地址:org.apache.spark.SparkContext.scala

//為Driver建立BlockManager,負責管理叢集中Executor上的BlockManager
 _env.blockManager.initialize(_applicationId)

第二步:建立Executor的時候,Executor內部會呼叫_env.blockManager.initialize(conf.getAppId)方法建立BlockManager
原始碼地址:org.apache.spark.executor.Executor.scala

  if (!isLocal) {
    //建立BlockManager
    env.blockManager.initialize(conf.getAppId)
    env.metricsSystem.registerSource(executorSource)
    env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
  }

第三步:BlockManager類裡的initialize方法,建立BlockManager,並且向BlockManagerMaster進行註冊

/**
 * BlockManager 執行在每個節點上,包括driver和executors都會有一份主要提供了本地或者遠端儲存的功能
 * 支援記憶體 磁碟 堆外儲存(Tychyon)
 */
private[spark] class BlockManager(
    executorId: String,  //BlockManager執行在哪個Executor之上
    rpcEnv: RpcEnv,   //遠端通訊體
    val master: BlockManagerMaster,  //BlockManagerMaster,管理整個叢集的BlockManger
    val serializerManager: SerializerManager,//預設序列化器
    val conf: SparkConf,
    memoryManager: MemoryManager,//記憶體管理器
    mapOutputTracker: MapOutputTracker,//shuffle輸出
    shuffleManager: ShuffleManager,//shuffle管理器
    val blockTransferService: BlockTransferService,//用於Block間的網路通訊(進行備份時)
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with BlockEvictionHandler with Logging {

......

  //負責記憶體儲存  
  private[spark] val memoryStore =
    new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
  //負責磁碟儲存
  private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
  memoryManager.setMemoryStore(memoryStore)

......


  def initialize(appId: String): Unit = {
    //初始化BlockTransferService,其實是它的子類NettyBlockTransferService是下了init方法
    //該方法的作用就是初始化傳輸服務,通過傳輸服務可以從不同的節點上拉取Block資料
    blockTransferService.init(this)
    shuffleClient.init(appId)

    //設定block的複製分片策略,由spark.storage.replication.policy指定
    blockReplicationPolicy = {
      val priorityClass = conf.get(
        "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
      val clazz = Utils.classForName(priorityClass)
      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
      logInfo(s"Using $priorityClass for block replication policy")
      ret
    }

    /**
     * 根據給定引數為對對應的Executor封裝一個BlockManagerId物件(塊儲存的唯一標識)
     * executorId:executor的Id,
     * blockTransferService.hostName:傳輸Block資料的服務的主機名
     * blockTransferService.port:傳輸Block資料的服務的主機名
     */
    val id =
      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

    //呼叫BlockManagerMaster的registerBlockManager方法向Driver上的BlockManagerMaster註冊
    val idFromMaster = master.registerBlockManager(
      id,
      maxOnHeapMemory,
      maxOffHeapMemory,
      slaveEndpoint)

    //更新BlockManagerId
    blockManagerId = if (idFromMaster != null) idFromMaster else id

    //判斷是否開了外部shuffle服務
    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    // Register Executors' configuration with the local shuffle service, if one should exist.
    //如果開啟了外部shuffle服務,並且該節點是Driver的話就呼叫registerWithExternalShuffleServer方法
    //將BlockManager註冊在本地
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }

    logInfo(s"Initialized BlockManager: $blockManagerId")
  }

......

}

第四步:BlockManagerMaster類裡的registerBlockManager方法,向Driver傳送RegisterBlockManager訊息進行註冊

  /**
   * 向Driver傳送RegisterBlockManager訊息進行註冊
   */
  def registerBlockManager(
      blockManagerId: BlockManagerId,
      maxOnHeapMemSize: Long,
      maxOffHeapMemSize: Long,
      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
    logInfo(s"Registering BlockManager $blockManagerId")
    //向Driver傳送註冊BlockManager請求
    //blockManagerId:塊儲存的唯一標識,裡邊封裝了該BlockManager所在的executorId,提供Netty服務的主機名和埠
    //maxMemSize最大的記憶體
    val updatedId = driverEndpoint.askSync[BlockManagerId](
      RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
    logInfo(s"Registered BlockManager $updatedId")
    updatedId
  }

第五步:BlockManagerMasterEndpoint類裡的receiveAndReply方法,這個方法就是接受請求的訊息,然後並處理請求

 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    //接收到來自Executor上的BlockManager註冊請求的時候,呼叫register方法開始註冊BlockManager,
    case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
      context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))

......

}

第六步:BlockManagerMasterEndpoint類裡的register方法,該方法的作用就是開始註冊executor上的BlockManager

/**
 * 負責維護各個executor和BlockManager的元資料 BlockManagerInfo、BlockStatus
 */
private[spark]
class BlockManagerMasterEndpoint(
    override val rpcEnv: RpcEnv,
    val isLocal: Boolean,
    conf: SparkConf,
    listenerBus: LiveListenerBus)
  extends ThreadSafeRpcEndpoint with Logging {
  
  // Mapping from block manager id to the block manager's information.
  // BlockManagerId->BlockManagerInfo的對映
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

  // Mapping from executor ID to block manager ID.
  //executorId->BlockManagerId的對映
  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

......

  /**
   * Returns the BlockManagerId with topology information populated, if available.
   * 
   * 開始註冊executor上的BlockManager
   * 
   */
  private def register(
      idWithoutTopologyInfo: BlockManagerId,
      maxOnHeapMemSize: Long,
      maxOffHeapMemSize: Long,
      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
    // the dummy id is not expected to contain the topology information.
    // we get that info here and respond back with a more fleshed out block manager id
    //利用從Executor上傳過來的BlockManagerId資訊重新封裝BlockManagerId,並且
    //之前傳過來沒有拓撲資訊,這次直接將拓撲資訊也封裝進去,得到一個更完整的BlockManagerId
    val id = BlockManagerId(
      idWithoutTopologyInfo.executorId,
      idWithoutTopologyInfo.host,
      idWithoutTopologyInfo.port,
      topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))

    val time = System.currentTimeMillis()
    //判斷當前這個BlockManagerId是否註冊過,註冊結構為:HashMap[BlockManagerId, BlockManagerInfo]
    //如果沒註冊過就向下執行開始註冊
    if (!blockManagerInfo.contains(id)) {
      /**
       * 首先會根據executorId查詢記憶體快取結構中是否有對應的BlockManagerId,如果為存在那麼就將呼叫removeExecutor方法,將executor從BlockManagerMaster中移除
       * 首先會移除executorId對應的BlockManagerId,然後在移除該舊的BlockManager。
       * 其實就是移除以前的註冊過的舊資料
       */
      blockManagerIdByExecutor.get(id.executorId) match {
        case Some(oldId) =>
          // A block manager of the same executor already exists, so remove it (assumed dead)
          logError("Got two different block manager registrations on same executor - "
              + s" will replace old one $oldId with new one $id")
          removeExecutor(id.executorId)
        case None =>
      }
      logInfo("Registering block manager %s with %s RAM, %s".format(
        id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))

      //將executorId與BlockManagerId對映起來,放到記憶體快取中
      blockManagerIdByExecutor(id.executorId) = id

      //將BlockManagerId與BlockManagerInfo對映起來,放入記憶體快取中
     //BlockManagerInfo封住了BlockMangerId,當前註冊的事件,最大的記憶體
      blockManagerInfo(id) = new BlockManagerInfo(
        id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
    }
    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
    id
  }

..... 
 
}

第七步:new mutable.HashMap[BlockManagerId, BlockManagerInfo]的BlockManagerInfo相當於是BlockManager的元資料

/**
 * 每個BlockManager的BlockManagerInfo,相當於是BlockManager的元資料
 */
private[spark] class BlockManagerInfo(
    val blockManagerId: BlockManagerId,
    timeMs: Long,
    val maxOnHeapMem: Long,
    val maxOffHeapMem: Long,
    val slaveEndpoint: RpcEndpointRef)
  extends Logging {

  val maxMem = maxOnHeapMem + maxOffHeapMem

  private var _lastSeenMs: Long = timeMs
  private var _remainingMem: Long = maxMem

  // Mapping from block id to its status.
  //BlockManagerInfo 管理了每個BlockManager內部的BlockId->BlockStatus對映
  private val _blocks = new JHashMap[BlockId, BlockStatus]

......

}

第八步:BlockManagerMasterEndpoint類裡的removeExecutor方法,該方法的作用就是移除掉之前註冊過的舊資料

  private def removeExecutor(execId: String) {
    logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
    //根據當前的executorId獲取對應的BlockManager,這個BlockManager已經是舊資料了
    //然後呼叫removeBlockManager方法繼續移除其他的舊資料
    blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
  }

第九步:BlockManagerMasterEndpoint類裡的removeBlockManager方法,該方法的作用就是開始移除之前註冊過的並且衝突的舊資料

  private def removeBlockManager(blockManagerId: BlockManagerId) {
    //根據就得BlockManagerId找到對應的BlockManagerInfo
    val info = blockManagerInfo(blockManagerId)

    // Remove the block manager from blockManagerIdByExecutor.
   //將舊的BlockManager裡的executorId對應的BlockManagerId移除掉 
    blockManagerIdByExecutor -= blockManagerId.executorId

    // Remove it from blockManagerInfo and remove all the blocks.
    // 然後移除掉BlockManagerId對應的BlockManagerInfo
    blockManagerInfo.remove(blockManagerId)

    val iterator = info.blocks.keySet.iterator
    //遍歷BlockManagerInfo內部所有的block塊的BlockId
    while (iterator.hasNext) {
      //清空BlockManagerInfo內部的block的BlockStatus資訊
      val blockId = iterator.next
      val locations = blockLocations.get(blockId)
      locations -= blockManagerId
      // De-register the block if none of the block managers have it. Otherwise, if pro-active
      // replication is enabled, and a block is either an RDD or a test block (the latter is used
      // for unit testing), we send a message to a randomly chosen executor location to replicate
      // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
      // etc.) as replication doesn't make much sense in that context.
      if (locations.size == 0) {
        blockLocations.remove(blockId)
        logWarning(s"No more replicas available for $blockId !")
      } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
        // As a heursitic, assume single executor failure to find out the number of replicas that
        // existed before failure
        val maxReplicas = locations.size + 1
        val i = (new Random(blockId.hashCode)).nextInt(locations.size)
        val blockLocations = locations.toSeq
        val candidateBMId = blockLocations(i)
        blockManagerInfo.get(candidateBMId).foreach { bm =>
          val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
          val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
          bm.slaveEndpoint.ask[Boolean](replicateMsg)
        }
      }
    }

    listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
    logInfo(s"Removing block manager $blockManagerId")

  }

第十步:更新BlockInfo

  /**
   * 更新BlockInfo,每個BlockManager上如果block發生了變化
   * 那麼都要傳送updateBlockInfo請求,進行BlockInfo更新
   */
  private def updateBlockInfo(
      blockManagerId: BlockManagerId,
      blockId: BlockId,
      storageLevel: StorageLevel,
      memSize: Long,
      diskSize: Long): Boolean = {

    // 如果該blockManagerId還沒有註冊,則返回
    if (!blockManagerInfo.contains(blockManagerId)) {
      if (blockManagerId.isDriver && !isLocal) {
        // 如果blockManagerId是driver上的BlockManager而且又不在本地,意思就是這個BlockManager是其他節點的
        // We intentionally do not register the master (except in local mode),
        // so we should not indicate failure.
        return true
      } else {
        return false
      }
    }
    // 如果沒有block,也不用更新block,所以返回
    if (blockId == null) {
      blockManagerInfo(blockManagerId).updateLastSeenMs()
      return true
    }
    // 呼叫BlockManagerInfo的updateBlockInfo方法,更新block
    blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)

    /**
     * 每一個block可能會在多個BlockManager上
     * 根據block的儲存級別StoreLevel,設定為_2的,就需要將block 備份到其他BlockManager上
     * blockLocations map 儲存了每個blockId的對應的BlockManagerId集合
     * 因為使用的是set儲存BlockManagerId,所以自動去重
     */
    var locations: mutable.HashSet[BlockManagerId] = null
    if (blockLocations.containsKey(blockId)) {
      locations = blockLocations.get(blockId)
    } else {
      locations = new mutable.HashSet[BlockManagerId]
      blockLocations.put(blockId, locations)
    }

     // 儲存級別有效,則向block對應的BlockManger集合裡新增該blockManagerId
    if (storageLevel.isValid) {
      locations.add(blockManagerId)
    } else {
      // 如果無效,則移除之
      locations.remove(blockManagerId)
    }

    // Remove the block from master tracking if it has been removed on all slaves.
    // 如果block對應的BlockManger集合為空,則沒有BlockManager與之對應,則從blockLocations刪除這個blockId
    if (locations.size == 0) {
      blockLocations.remove(blockId)
    }
    true
  }

第十一步:呼叫BlockManagerInfo的updateBlockInfo方法,更新block

  def updateBlockInfo(
      blockId: BlockId,
      storageLevel: StorageLevel,
      memSize: Long,
      diskSize: Long) {

    updateLastSeenMs()
    
    val blockExists = _blocks.containsKey(blockId)
    var originalMemSize: Long = 0
    var originalDiskSize: Long = 0
    var originalLevel: StorageLevel = StorageLevel.NONE
     //判斷內部是否有block
    if (blockExists) {
      // The block exists on the slave already.
      val blockStatus: BlockStatus = _blocks.get(blockId)
      originalLevel = blockStatus.storageLevel
      originalMemSize = blockStatus.memSize
      originalDiskSize = blockStatus.diskSize
      // 判斷storeLevel是否使用記憶體,是 就給剩餘記憶體數量加上當前記憶體數量
      if (originalLevel.useMemory) {
        _remainingMem += originalMemSize
      }
    }
     // 給block建立一個BlockStatus,然後根據持久化級別,對相應的記憶體資源進行計算
    if (storageLevel.isValid) {
      /* isValid means it is either stored in-memory or on-disk.
       * The memSize here indicates the data size in or dropped from memory,
       * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
       * and the diskSize here indicates the data size in or dropped to disk.
       * They can be both larger than 0, when a block is dropped from memory to disk.
       * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
      var blockStatus: BlockStatus = null
      if (storageLevel.useMemory) {
        blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
        _blocks.put(blockId, blockStatus)
        _remainingMem -= memSize
        if (blockExists) {
          logInfo(s"Updated $blockId in memory on ${blockManagerId.hostPort}" +
            s" (current size: ${Utils.bytesToString(memSize)}," +
            s" original size: ${Utils.bytesToString(originalMemSize)}," +
            s" free: ${Utils.bytesToString(_remainingMem)})")
        } else {
          logInfo(s"Added $blockId in memory on ${blockManagerId.hostPort}" +
            s" (size: ${Utils.bytesToString(memSize)}," +
            s" free: ${Utils.bytesToString(_remainingMem)})")
        }
      }
      if (storageLevel.useDisk) {
        blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
        _blocks.put(blockId, blockStatus)
        if (blockExists) {
          logInfo(s"Updated $blockId on disk on ${blockManagerId.hostPort}" +
            s" (current size: ${Utils.bytesToString(diskSize)}," +
            s" original size: ${Utils.bytesToString(originalDiskSize)})")
        } else {
          logInfo(s"Added $blockId on disk on ${blockManagerId.hostPort}" +
            s" (size: ${Utils.bytesToString(diskSize)})")
        }
      }
      if (!blockId.isBroadcast && blockStatus.isCached) {
        _cachedBlocks += blockId
      }
    //如果storageLevel是非法的,而且之前儲存過blockId,那麼就將blockId從記憶體中刪除
    } else if (blockExists) {
      // If isValid is not true, drop the block.
      _blocks.remove(blockId)
      _cachedBlocks -= blockId
      if (originalLevel.useMemory) {
        logInfo(s"Removed $blockId on ${blockManagerId.hostPort} in memory" +
          s" (size: ${Utils.bytesToString(originalMemSize)}," +
          s" free: ${Utils.bytesToString(_remainingMem)})")
      }
      if (originalLevel.useDisk) {
        logInfo(s"Removed $blockId on ${blockManagerId.hostPort} on disk" +
          s" (size: ${Utils.bytesToString(originalDiskSize)})")
      }
    }
  }

BlockManager之block資料的讀取

Task執行的時候是要去獲取Parent 的RDD對應的Partition的資料的,即它會呼叫RDD的iterator方法把對應的Partition的資料集給遍歷出來,然後寫入儲存,這個儲存可能是磁碟或者記憶體,取決於StorageLevel是什麼。

如果當前RDD的StorageLevel不為空,則表示已經存持久化了,我們可以直接在記憶體中獲取,而不是去計算Parent RDD。如果沒有StorageLevel,則表示沒有快取過,記憶體中沒有,則我們需要執行的資料就需要從Parent RDD計算出來。注意,這裡所謂的快取並不是使用什麼cache 元件,而直接是從本地讀取,本地沒有則從遠端,獲取的結果直接放入記憶體儲存,方便後續讀取,這才是真正cache的地方。
在這裡插入圖片描述

第一步:RDD的iterator方法
原始碼地址:org.apache.spark.rdd.RDD.scala

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      //進行RDD的partition計算
      computeOrReadCheckpoint(split, context)
    }
  }

第二步:getOrCompute(split, context)方法

  /**
   * 從記憶體或者磁盤獲取,如果磁盤獲取需要將block快取到記憶體
   */
  private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
    // 根據rdd id建立RDDBlockId
    val blockId = RDDBlockId(id, partition.index)
      // 是否從快取的block讀取
    var readCachedBlock = true
    // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
      readCachedBlock = false
      // 如果資料不在記憶體,那麼就嘗試讀取檢查點結果迭代計算
      computeOrReadCheckpoint(partition, context)
    }) match {
      // 獲取到了結果直接返回
      case Left(blockResult) =>
        // 如果從cache讀取block
        if (readCachedBlock) {
          val existingMetrics = context.taskMetrics().inputMetrics
          existingMetrics.incBytesRead(blockResult.bytes)
          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
            override def next(): T = {
              existingMetrics.incRecordsRead(1)
              delegate.next()
            }
          }
        } else {
          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
        }
      case Right(iter) =>
        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
    }
  }

第三步: SparkEnv.get.blockManager.getOrElseUpdate方法

  /**
   * 如果指定的block存在,則直接獲取,否則呼叫makeIterator方法去計算block,然後持久化最後返回值
   */
  def getOrElseUpdate[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[T],
      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    // Attempt to read the block from local or remote storage. If it's present, then we don't need
    // to go through the local-get-or-put path.
     // 嘗試從本地獲取資料,如果獲取不到則從遠端獲取
    get[T](blockId)(classTag) match {
      case Some(block) =>
        return Left(block)
      case _ =>
        // Need to compute the block.
    }
    // Initially we hold no locks on this block.
    // 如果本地化和遠端都沒有獲取到資料,則呼叫makeIterator計算,最後將結果寫入block
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
      // 表示寫入成功
      case None =>
        // doPut() didn't hand work back to us, so the block already existed or was successfully
        // stored. Therefore, we now hold a read lock on the block.
        // 從本地獲取資料塊
        val blockResult = getLocalValues(blockId).getOrElse {
          // Since we held a read lock between the doPut() and get() calls, the block should not
          // have been evicted, so get() not returning the block indicates some internal error.
          releaseLock(blockId)
          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
        }
        // We already hold a read lock on the block from the doPut() call and getLocalValues()
        // acquires the lock again, so we need to call releaseLock() here so that the net number
        // of lock acquisitions is 1 (since the caller will only call release() once).
        releaseLock(blockId)
        Left(blockResult)
      case Some(iter) => // 如果寫入失敗
        // The put failed, likely because the data was too large to fit in memory and could not be
        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
        // that they can decide what to do with the values (e.g. process them without caching).
        // 如果put操作失敗,表示可能是因為資料太大,無法寫入記憶體,又無法被磁碟drop,因此我們需要返回這個iterator給呼叫者
       Right(iter)
    }
  }

第四步:BlockManager的get方法

  /**
   * 讀取資料的入口點,會判斷資料是否在本地而選擇是直接從本地讀取還是通過BlockTransferService讀取遠端資料:
   */
  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
   // 從本地獲取block
    val local = getLocalValues(blockId)
      // 如果本地獲取到了則返回
    if (local.isDefined) {
      logInfo(s"Found block $blockId locally")
      return local
    }
    // 如果本地沒有獲取到則從遠端獲取
    val remote = getRemoteValues[T](blockId)
    // 如果遠端獲取到了則返回,沒有返回None
    if (remote.isDefined) {
      logInfo(s"Found block $blockId remotely")
      return remote
    }
    None
  }

第五步:BlockManager的getLocalValues方法

  /**
   * 從本地獲取block,如果存在返回BlockResult,不存在返回None;
   * 如果storage level是磁碟,則還需將得到的block快取到記憶體儲存,方便下次讀取
   */
  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
    logDebug(s"Getting local block $blockId")
    // 呼叫block info manager,鎖定該block,然後讀取block,返回該block 元資料block info
    blockInfoManager.lockForReading(blockId) match {
      case None =>  // 沒有讀取到則返回None
        logDebug(s"Block $blockId was not found")
        None
      case Some(info) =>  // 讀取到block元資料
        // 獲取儲存級別storage level
        val level = info.level
        logDebug(s"Level for block $blockId is $level")
        val taskAttemptId = Option(TaskContext.get()).map(_.taskAttemptId())
        // 如果使用記憶體,且記憶體memory store包含這個block id
        if (level.useMemory && memoryStore.contains(blockId)) {
          // 如果序列化了,那麼說明是物件資料,使用getValues
          val iter: Iterator[Any] = if (level.deserialized) {
            memoryStore.getValues(blockId).get
          } else {
            // 沒序列化,那麼是資料流,使用getBytes()
            serializerManager.dataDeserializeStream(
              blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
          }
          // We need to capture the current taskId in case the iterator completion is triggered
          // from a different thread which does not have TaskContext set; see SPARK-18406 for
          // discussion.
          val ci = CompletionIterator[Any, Iterator[Any]](iter, {
            releaseLock(blockId, taskAttemptId)
          })
          // 構建一個BlockResult物件返回,這個物件包括資料,讀取方式以及位元組大小
          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
        } 
        // 如果使用磁碟儲存,且disk store包含這個block則從磁盤獲取,並且把結果放入記憶體
        else if (level.useDisk && diskStore.contains(blockId)) {
           // 先讀取資料
          val diskData = diskStore.getBytes(blockId)
          val iterToReturn: Iterator[Any] = {
            //如果需要反序列化,則進行反序列
            if (level.deserialized) {
              val diskValues = serializerManager.dataDeserializeStream(
                blockId,
                diskData.toInputStream())(info.classTag)
              // 先序列化,後將資料放入記憶體
              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
            } else {
              // 先將資料放入記憶體
              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
                .map { _.toInputStream(dispose = false) }
                .getOrElse { diskData.toInputStream() }
              //序列化返回的值
              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
            }
          }
          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
            releaseLockAndDispose(blockId, diskData, taskAttemptId)
          })
          // 構建BlockResult返回
          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
        } else {
          // 處理本地讀取block失敗,報告driver這是一個無效的block,將會刪除這個block
          handleLocalReadFailure(blockId)
        }
    }
  }

第六步:memoryStore.getValues(blockId).get方法

  //存放每個block資料
  private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

......

  def getValues(blockId: BlockId): Option[Iterator[_]] = {
   //多執行緒併發訪問同步
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
      case null => None
      case e: SerializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getValues on deserialized blocks")
      case DeserializedMemoryEntry(values, _, _) =>
        val x = Some(values)
        x.map(_.iterator)
    }
  }

  def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
      case null => None
      case e: DeserializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getBytes on serialized blocks")
      case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
    }
  }

發現都是從一個entries的LinkedHashMap裡面按照blockId讀取資料,可以看出Spark底層是在使用一個LinkedHashMap儲存資料。使用LinkedHashMap可以儲存鍵值對的插入順序,這樣在記憶體不夠時,先插入的資料會先溢位到磁碟,實現了FIFO序。

第七步:磁碟讀取diskStore.getBytes(blockId)方法
Spark通過spark.local.dir設定檔案儲存的目錄,預設情況下設定一個一級目錄,在這個一級目錄下最多建立64個二級目錄,目錄的名稱是00-63,目錄中檔案的名稱是blockId.name這個欄位,唯一標識一個塊。

兩級目錄管理檔案,如下:

目錄 說明 預設值
spark.local.dir 磁碟檔案的儲存位置,可以配置多個,以逗號分隔 /tmp
一級目錄 在spark.local.dir目錄下建立一級目錄,由blockmgr字首加UUID組成 blockmgr-{UUID}
二級目錄 在一級目錄下建立二級目錄,預設64個資料夾,16進製表示 00至3f(16進位制的0到63)
檔名 儲存的檔案內容可以是RDD、ShuffleData、Broadcast,以ShuffleDataBlockId為例,其格式為 “shuffle_” + shuffleId + “” + mapId + “” + reduceId + “.data” 參考BlockId程式碼
  def getBytes(blockId: BlockId): BlockData = {
    val file = diskManager.getFile(blockId.name)
    val blockSize = getSize(blockId)

    securityManager.getIOEncryptionKey() match {
      case Some(key) =>
        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
        // and provides InputStream / FileRegion implementations for reading the data.
        new EncryptedBlockData(file, blockSize, conf, key)

      case _ =>
        new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
    }
  }

第八步:讀取資料之前,首先使用了getFile來獲取儲存資料的檔案

  def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    // filename 傳進來的是blockId
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    // 計算出當前的block資料存在哪個二級目錄裡面
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    // 要是這個二級目錄不存在,那麼需要建立這個二級目錄
    val subDir = subDirs(dirId).synchronized {
      val old = subDirs(dirId)(subDirId)
      if (old != null) {
        old
      } else {
        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
        if (!newDir.exists() && !newDir.mkdir()) {
          throw new IOException(s"Failed to create local dir in $newDir.")
        }
        subDirs(dirId)(subDirId) = newDir
        newDir
      }
    }

第九步:遠端讀取val remote = getRemoteValues[T](blockId)

  /**
   * Get block from remote block managers.
   *
   * This does not acquire a lock on this block in this JVM.
   */
  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
    val ct = implicitly[ClassTag[T]]
   // 將遠端fetch的結果進行反序列化,然後構建BlockResult返回
    getRemoteBytes(blockId).map { data =>
      val values =
        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
      new BlockResult(values, DataReadMethod.Network, data.size)
    }
  }

第十步:getRemoteBytes方法

  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    // TODO if we change this method to return the ManagedBuffer, then getRemoteValues
    // could just use the inputStream on the temp file, rather than reading the file into memory.
    // Until then, replication can cause the process to use too much memory and get killed
    // even though we've read the data to disk.
    logDebug(s"Getting remote block $blockId")
    require(blockId != null, "BlockId is null")
    var runningFailureCount = 0
    var totalFailureCount = 0

    // Because all the remote blocks are registered in driver, it is not necessary to ask
    // all the slave executors to get block status.
    //查詢這個資料的具體位置,獲取到了所有持有這個block 的block manager id
    val locationsAndStatus = master.getLocationsAndStatus(blockId)
    val blockSize = locationsAndStatus.map { b =>
      b.status.diskSize.max(b.status.memSize)
    }.getOrElse(0L)
    val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)

    // If the block size is above the threshold, we should pass our FileManger to
    // BlockTransferService, which will leverage it to spill the block; if not, then passed-in
    // null value means the block will be persisted in memory.
    val tempFileManager = if (blockSize > maxRemoteBlockToMem) {
      remoteBlockTempFileManager
    } else {
      null
    }
    val locations = sortLocations(blockLocations)
    // 最大允許的獲取block的失敗次數為該block對應的block manager數量
    val maxFetchFailures = locations.size
    var locationIterator = locations.iterator
    // 開始遍歷block manager
    while (locationIterator.hasNext) {
      val loc = locationIterator.next()
      logDebug(s"Getting remote block $blockId from $loc")
      // 通過呼叫BlockTransferSerivce的fetchBlockSync方法從遠端獲取block
      val data = try {
        blockTransferService.fetchBlockSync(
          loc.host, loc.port, loc.executorId, blockId.toString, tempFileManager)
      } catch {
        case NonFatal(e) =>
          runningFailureCount += 1
          totalFailureCount += 1
          // 如果總的失敗數量大於了閥值則返回None
          if (totalFailureCount >= maxFetchFailures) {
            // Give up trying anymore locations. Either we've tried all of the original locations,
            // or we've refreshed the list of locations from the master, and have still
            // hit failures after trying locations from the refreshed list.
            logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
              s"Most recent failure cause:", e)
            return None
          }

          logWarning(s"Failed to fetch remote block $blockId " +
            s"from $loc (failed attempt $runningFailureCount)", e)

          // If there is a large number of executors then locations list can contain a
          // large number of stale entries causing a large number of retries that may
          // take a significant amount of time. To get rid of these stale entries
          // we refresh the block locations after a certain number of fetch failures
          if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
            locationIterator = sortLocations(master.getLocations(blockId)).iterator
            logDebug(s"Refreshed locations from the driver " +
              s"after ${runningFailureCount} fetch failures.")
            runningFailureCount = 0
          }

          // This location failed, so we retry fetch from a different one by returning null here
          null
      }
      // 返回ChunkedByteBuffer
      if (data != null) {
        // SPARK-24307 undocumented "escape-hatch" in case there are any issues in converting to
        // ChunkedByteBuffer, to go back to old code-path.  Can be removed post Spark 2.4 if
        // new path is stable.
        if (remoteReadNioBufferConversion) {
          return Some(new ChunkedByteBuffer(data.nioByteBuffer()))
        } else {
          return Some(ChunkedByteBuffer.fromManagedBuffer(data))
        }
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }
// 從blockLocations這個JHashMap裡面查詢,返回了持有這個block的所有block manager id
  private def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
    val locations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
    val status = locations.headOption.flatMap { bmId => blockManagerInfo(bmId).getStatus(blockId) }

    if (locations.nonEmpty && status.isDefined) {
      Some(BlockLocationsAndStatus(locations, status.get))
    } else {
      None
    }
  }

第十步:blockTransferService.fetchBlockSync)方法負責根據BlockManagerId讀取資料,需要說明的是BlockManagerId不是個欄位,是個class,有host,port,executor id等欄位

  def fetchBlockSync(
      host: String,
      port: Int,
      execId: String,
      blockId: String,
      tempFileManager: DownloadFileManager): ManagedBuffer = {
    // A monitor for the thread to wait on.
    // 監控等待的執行緒.
    val result = Promise[ManagedBuffer]()
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
          result.failure(exception)
        }
        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
          data match {
            case f: FileSegmentManagedBuffer =>
              result.success(f)
            case _ =>
              val ret = ByteBuffer.allocate(data.size.toInt)
              ret.put(data.nioByteBuffer())
              ret.flip()
              result.success(new NioManagedBuffer(ret))
          }
        }
      }, tempFileManager)
    ThreadUtils.awaitResult(result.future, Duration.Inf)
  }

第十一步:fetchBlocks是抽象方法,實際上呼叫了NettyBlockTransferService裡面的實現

  /**
   * 用於獲取遠端shuffle檔案,實際上是利用NettyBlockTransferService中建立的netty服務。
    *
   */
  override def fetchBlocks(
      host: String,
      port: Int,
      execId: String,
      blockIds: Array[String],
      listener: BlockFetchingListener,
      tempFileManager: DownloadFileManager): Unit = {
    logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
    try {
      val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
        override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
           //通過C/S模式從遠端進行通訊,來拉去資料。
          val client = clientFactory.createClient(host, port)
           // 一對一讀取資料
          new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
            transportConf, tempFileManager).start()
        }
      }

      val maxRetries = transportConf.maxIORetries()
      if (maxRetries > 0) {
        // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
        // a bug in this code. We should remove the if statement once we're sure of the stability.
        new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
      } else {
        blockFetchStarter.createAndStart(blockIds, listener)
      }
    } catch {
      case e: Exception =>
        logError("Exception while beginning fetchBlocks", e)
        blockIds.foreach(listener.onBlockFetchFailure(_, e))
    }
  }

第十二步:new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,transportConf, tempFileManager).start()方法是用rpc向持有block的executor傳送訊息

  public void start() {
    if (blockIds.length == 0) {
      throw new IllegalArgumentException("Zero-sized blockIds array");
    }

    client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
      @Override
      public void onSuccess(ByteBuffer response) {
        try {
          streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
          logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);

          // Immediately request all chunks -- we expect that the total size of the request is
          // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
          for (int i = 0; i < streamHandle.numChunks; i++) {
            if (downloadFileManager != null) {
              client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
                new DownloadCallback(i));
            } else {
              client.fetchChunk(streamHandle.streamId, i, chunkCallback);
            }
          }
        } catch (Exception e) {
          logger.error("Failed while starting block fetches after success", e);
          failRemainingBlocks(blockIds, e);
        }
      }

      @Override
      public void onFailure(Throwable e) {
        logger.error("Failed while starting block fetches", e);
        failRemainingBlocks(blockIds, e);
      }
    });
  }

第十三步:訊息將由對應Executor的NettyBlockRpcServer中的receive收到,並呼叫getBlockData方法來讀取資料

  override def receive(
      client: TransportClient,
      rpcMessage: ByteBuffer,
      responseContext: RpcResponseCallback): Unit = {
    val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
    logTrace(s"Received request: $message")

    message match {
      case openBlocks: OpenBlocks =>
        val blocksNum = openBlocks.blockIds.length
        val blocks = for (i <- (0 until blocksNum).view)
          yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
        val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
        logTrace(s"Registered streamId $streamId with $blocksNum buffers")
        responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)

      case uploadBlock: UploadBlock =>
        // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
        val (level: StorageLevel, classTag: ClassTag[_]) = {
          serializer
            .newInstance()
            .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
            .asInstanceOf[(StorageLevel, ClassTag[_])]
        }
        val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
        val blockId = BlockId(uploadBlock.blockId)
        logDebug(s"Receiving replicated block $blockId with level ${level} " +
          s"from ${client.getSocketAddress}")
        blockManager.putBlockData(blockId, data, level, classTag)
        responseContext.onSuccess(ByteBuffer.allocate(0))
    }
  }

第十四步:getBlockData方法來讀取資料
原始碼地址:org.apache.spark.storage.BlockManager.scala

  override def getBlockData(blockId: BlockId): ManagedBuffer = {
    if (blockId.isShuffle) {
      //先呼叫的是ShuffleManager的shuffleBlockResolver方法,得到ShuffleBlockResolver
      //呼叫其IndexShuffleBlockResolver.getBlockData方法
      shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
    } else {
      getLocalBytes(blockId) match {
        case Some(blockData) =>
          new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
        case None =>
          // If this block manager receives a request for a block that it doesn't have then it's
          // likely that the master has outdated block statuses for this block. Therefore, we send
          // an RPC so that this block is marked as being unavailable from this block manager.
          reportBlockStatus(blockId, BlockStatus.empty)
          throw new BlockNotFoundException(blockId.toString)
      }
    }
  }

第十六步:IndexShuffleBlockResolver#getBlockData()方法

  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // The block is actually going to be a range of a single map output file for this map, so
    // find out the consolidated file, then the offset within that from our index
     //使用shuffleId和mapId,獲取對應索引檔案
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

    // SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code
    // which is incorrectly using our file descriptor then this code will fetch the wrong offsets
    // (which may cause a reducer to be sent a different reducer's data). The explicit position
    // checks added here were a useful debugging aid during SPARK-22982 and may help prevent this
    // class of issue from re-occurring in the future which is why they are left here even though
    // SPARK-22982 is fixed.
    //定位到本次Block對應的資料位置
    val channel = Files.newByteChannel(indexFile.toPath)
    channel.position(blockId.reduceId * 8L)
    val in = new DataInputStream(Channels.newInputStream(channel))
    try {
       //資料起始位置
      val offset = in.readLong()
       //資料結束位置
      val nextOffset = in.readLong()
      val actualPosition = channel.position()
      val expectedPosition = blockId.reduceId * 8L + 16
      if (actualPosition != expectedPosition) {
        throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " +
          s"expected $expectedPosition but actual position was $actualPosition.")
      }
       //返回FileSegment
      new FileSegmentManagedBuffer(
        transportConf,
        getDataFile(blockId.shuffleId, blockId.mapId),
        offset,
        nextOffset - offset)
    } finally {
      in.close()
    }
  }

  override def stop(): Unit = {}
}

BlockManager之block資料的寫入

RDD 在快取到儲存記憶體之後,Partition 被轉換成 Block,Record 在堆內或堆外儲存記憶體中佔用一塊連續的空間。將Partition由不連續的儲存空間轉換為連續儲存空間的過程,Spark稱之為"展開"(Unroll)。Block 有序列化和非序列化兩種儲存格式,具體以哪種方式取決於該 RDD 的儲存級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的資料結構定義,用一個數組儲存所有的物件例項,序列化的 Block 則以 SerializedMemoryEntry的資料結構定義,用位元組緩衝區(ByteBuffer)來儲存二進位制資料。每個 Executor 的 Storage 模組用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外儲存記憶體中所有的 Block 物件的例項,對這個 LinkedHashMap 新增和刪除間接記錄了記憶體的申請和釋放。

因為不能保證儲存空間可以一次容納 Iterator 中的所有資料,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時佔位,空間不足則 Unroll 失敗,空間足夠時可以繼續進行。對於序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程中依次申請,即每讀取一條 Record,取樣估算其所需的 Unroll 空間並進行申請,空間不足時可以中斷,釋放已佔用的 Unroll 空間。如果最終 Unroll 成功,當前 Partition 所佔用的 Unroll 空間被轉換為正常的快取 RDD 的儲存空間。

getOrElseUpdate()方法裡面的doPutIterator()是寫入資料的入口:
第一步:doPutIteratord()方法
原始碼地址:org.apache.spark.storage.BlockManager.scala
在這裡插入圖片描述

  /**
   * 根據不同的配置,選擇了不同的方法寫入資料,然後更新了資料塊的狀態,然後做了備份的更新
   */
  private def doPutIterator[T](
      blockId: BlockId,
      iterator: () => Iterator[T],
      level: StorageLevel,
      classTag: ClassTag[T],
      tellMaster: Boolean = true,
      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
      val startTimeMs = System.currentTimeMillis
      var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
      // Size of the block in bytes
      var size = 0L
      //儲存級別是記憶體
      if (level.useMemory) {
        // Put it in memory first, even if it also has useDisk set to true;
        // We will drop it to disk later if the memory store can't hold it.
        //需要序列化
        if (level.deserialized) {
          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
            case Right(s) =>
              size = s
            case Left(iter) =>
              // Not enough space to unroll this block; drop to disk if applicable
              //空間不夠展開塊且允許寫入磁碟
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
                }
                size = diskStore.getSize(blockId)
              } else {
                //返回錯誤提示
                iteratorFromFailedMemoryStorePut = Some(iter)
              }
          }
        } else { // !level.deserialized
          //不用序列化
          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
            case Right(s) =>
              size = s
            case Left(partiallySerializedValues) =>
              // Not enough space to unroll this block; drop to disk if applicable
              //空間不夠展開塊且允許寫入磁碟
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  partiallySerializedValues.finishWritingToStream(out)
                }
                size = diskStore.getSize(blockId)
              } else {
                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
              }
          }
        }
      //儲存級別是磁碟
      } else if (level.useDisk) {
        diskStore.put(blockId) { channel =>
          val out = Channels.newOutputStream(channel)
          serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
        }
        size = diskStore.getSize(blockId)
      }
      val putBlockStatus = getCurrentBlockStatus(blockId, info)
      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
      if (blockWasSuccessfullyStored) {
        // Now that the block is in either the memory or disk store, tell the master about it.
        info.size = size
        if (tellMaster && info.tellMaster) {
          reportBlockStatus(blockId, putBlockStatus)
        }
        //更新資料塊資訊
        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
        //備份數目>1
        if (level.replication > 1) {
          val remoteStartTime = System.currentTimeMillis
          val bytesToReplicate = doGetLocalBytes(blockId, info)
          // [SPARK-16550] Erase the typed classTag when using default serialization, since
          // NettyBlockRpcServer crashes when deserializing repl-defined classes.
          // TODO(ekl) remove this once the classloader issue on the remote end is fixed.
          val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
            scala.reflect.classTag[Any]
          } else {
            classTag
          }
          try {
            replicate(blockId, bytesToReplicate, level, remoteClassTag)
          } finally {
            bytesToReplicate.dispose()
          }
          logDebug("Put block %s remotely took %s"
            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
        }
      }
      assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
      iteratorFromFailedMemoryStorePut
    }
  }

展開塊是在寫入資料到記憶體是不先寫入,而是多次寫入,每次寫入之前首先檢查剩餘的記憶體是否足夠存放塊,不夠的話就嘗試將記憶體中已有的資料寫入到磁碟,釋放空間來放新的資料

第二步:memoryStore.putIteratorAsValues 嘗試將當前塊作為value儲存在記憶體中

  /**
   * Attempt to put the given block in memory store as values.
   * 嘗試將當前塊作為value儲存在記憶體中
   *  
   *  
   *  有可能iterator太大以至於不能儲存到記憶體中,為了避免OOM,這個方法會逐漸展開iterator並間歇性檢查是否有足夠的空餘記憶體
   *  如果這個塊成功地儲存到了記憶體中,那麼這些在儲存過程中暫時展開的的記憶體就成了儲存記憶體,因此我們不會獲取多餘的記憶體。
   *  
   * @return in case of success, the estimated size of the stored data. In case of failure, return
   *         an iterator containing the values of the block. The returned iterator will be backed
   *         by the combination of the partially-unrolled block and the remaining elements of the
   *         original input iterator. The caller must either fully consume this iterator or call
   *         `close()` on it in order to free the storage memory consumed by the partially-unrolled
   *         block.
   */
  private[storage] def putIteratorAsValues[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {

    val valuesHolder = new DeserializedValuesHolder[T](classTag)

    putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
      case Right(storedSize) => Right(storedSize)
      case Left(unrollMemoryUsedByThisBlock) =>
        val unrolledIterator = if (valuesHolder.vector != null) {
          valuesHolder.vector.iterator
        } else {
          valuesHolder.arrayValues.toIterator
        }

        Left(new PartiallyUnrolledIterator(
          this,
          MemoryMode.ON_HEAP,
          unrollMemoryUsedByThisBlock,
          unrolled = unrolledIterator,
          rest = values))
    }
  }
  private def putIterator[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T],
      memoryMode: MemoryMode,
      valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    // Number of elements unrolled so far
     // 當前已經展開的資料塊
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block
    // 是否仍有足夠的記憶體來展開資料塊
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes).
    // 每個展開執行緒初始的記憶體大小
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory
    // 每隔幾次檢查是否有足夠的空餘空間
    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
    // Memory currently reserved by this task for this particular unrolling operation
    // 當前執行緒保留用來做展開塊工作的記憶體大小
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size
   // 記憶體增長因子,每次請求的記憶體大小為(memoryGrowthFactor * vector .size())-memoryThreshold
    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
    // Keep track of unroll memory used by this particular block / putIterator() operation
    // 展開這個塊使用的記憶體大小
    var unrollMemoryUsedByThisBlock = 0L

    // Request enough memory to begin unrolling
    // 請求足夠的記憶體做unrolling
    keepUnrolling =
      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

    if (!keepUnrolling) {
      logWarning(s"Failed to reserve initial memory threshold of " +
        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    }

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    // 安全地展開這個資料庫,定期檢查剩餘記憶體是否足夠
    while (values.hasNext && keepUnrolling) {
      valuesHolder.storeValue(values.next())
       // 每16次檢查一次是否超出了分配的記憶體的大小
      if (elementsUnrolled % memoryCheckPeriod == 0) {
        val currentSize = valuesHolder.estimatedSize()
        // If our vector's size has exceeded the threshold, request more memory
        // 如果不夠
        if (currentSize >= memoryThreshold) {
          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
          /