BlockManager原始碼詳解
阿新 • • 發佈:2020-07-27
一、簡介
在每個節點(driver和executor)上執行的管理器,該介面提供用於在本地和遠端將block放置和檢索到各種儲存(記憶體,磁碟和堆外)的介面。
二、原始碼
private[spark] class BlockManager( executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, serializerManager: SerializerManager, val conf: SparkConf, memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, val blockTransferService: BlockTransferService, securityManager: SecurityManager, numUsableCores: Int) extends BlockDataManager with BlockEvictionHandler with Logging { // 是否配置spark是否啟用shuffle,提高效能 private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) val diskBlockManager = { // 只有driver或者不開啟shuffler的executor val deleteFilesOnStop = !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER new DiskBlockManager(conf, deleteFilesOnStop) } // Visible for testing private[storage] val blockInfoManager = new BlockInfoManager private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) // 資料塊實際儲存的位置 memoryStore和diskStore private[spark] val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this) private[spark] val diskStore = new DiskStore(conf, diskBlockManager) memoryManager.setMemoryStore(memoryStore) // 根據記憶體管理,maxMemory這個值很容易就達到 private val maxMemory = memoryManager.maxOnHeapStorageMemory // shuffle程式使用的埠,如果使用的是yarn,可以通過hadoop配置來設定 private val externalShuffleServicePort = { val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt if (tmpPort == 0) { // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds // an open port. But we still need to tell our spark apps the right port to use. So // only if the yarn config has the port set to 0, we prefer the value in the spark config conf.get("spark.shuffle.service.port").toInt } else { tmpPort } } var blockManagerId: BlockManagerId = _ // 進行shuffle的伺服器地址 private[spark] var shuffleServerId: BlockManagerId = _ // 讀取其他executor的shuffle檔案,這是一個外部服務,或者是標準的BlockTransferService服務去連線其他executor private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), securityManager.isSaslEncryptionEnabled()) } else { blockTransferService } // block manager 重新整理從driver獲得的block資訊失敗的最大次數 private val maxFailuresBeforeLocationRefresh = conf.getInt("spark.block.failures.beforeLocationRefresh", 5) private val slaveEndpoint = rpcEnv.setupEndpoint( "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next, new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker)) // 等待的任務將會非同步執行 // 使用asyncReregisterLock這個鎖實現同步 private var asyncReregisterTask: Future[Unit] = null private val asyncReregisterLock = new Object // Field related to peer block managers that are necessary for block replication @volatile private var cachedPeers: Seq[BlockManagerId] = _ private val peerFetchLock = new Object private var lastPeerFetchTime = 0L /** * 使用給定的APPID初始化blockmanager。這個操作不會在建構函式中執行,因為 * AppId在例項化的時候可能還未知(特別是driver,只有在TaskScheduler註冊之後才知道) * 這個方法將會初始化BlockTransferService和ShuffleClient,並向BlockManagerMaster註冊,啟動BlockManagerWorker,如果有配置shuffle,並向shuffle註冊 */ def initialize(appId: String): Unit = { blockTransferService.init(this) shuffleClient.init(appId) blockManagerId = BlockManagerId( executorId, blockTransferService.hostName, blockTransferService.port) shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } } private def registerWithExternalShuffleServer() { logInfo("Registering executor with local external shuffle service.") val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirs.map(_.toString), diskBlockManager.subDirsPerLocalDir, shuffleManager.getClass.getName) val MAX_ATTEMPTS = 3 val SLEEP_TIME_SECS = 5 for (i <- 1 to MAX_ATTEMPTS) { try { // Synchronous and will throw an exception if we cannot connect. shuffleClient.asInstanceOf[ExternalShuffleClient].registerWithShuffleServer( shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig) return } catch { case e: Exception if i < MAX_ATTEMPTS => logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" + s" more times after waiting $SLEEP_TIME_SECS seconds...", e) Thread.sleep(SLEEP_TIME_SECS * 1000) } } } /** * 向blockManager上報所有的塊資訊。這個是很重要的,比如在一個executor崩潰之後恢復塊等情況 如果master返回false這個方法就會失敗(表明slave需要重新註冊),心跳檢測或者新的塊資訊上報時將會發現這個錯誤資訊,然後將會嘗試重新註冊所有塊資訊 */ private def reportAllBlocks(): Unit = { logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.") for ((blockId, info) <- blockInfoManager.entries) { // 獲取塊資訊 val status = getCurrentBlockStatus(blockId, info) // 更新塊資訊 if (!tryToReportBlockStatus(blockId, info, status)) { logError(s"Failed to report $blockId to master; giving up.") return } } } /** * 重新向master註冊,並上報塊資訊。如果心跳檢測發現表明塊沒有註冊,心跳檢測執行緒將會呼叫這個方法 */ def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo(s"BlockManager $blockManagerId re-registering with master") master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint) reportAllBlocks() } /** * 同步註冊 */ private def asyncReregister(): Unit = { asyncReregisterLock.synchronized { if (asyncReregisterTask == null) { asyncReregisterTask = Future[Unit] { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool reregister() asyncReregisterLock.synchronized { asyncReregisterTask = null } }(futureExecutionContext) } } } /** * For testing. Wait for any pending asynchronous re-registration; otherwise, do nothing. */ def waitForAsyncReregister(): Unit = { val task = asyncReregisterTask if (task != null) { try { Await.ready(task, Duration.Inf) } catch { case NonFatal(t) => throw new Exception("Error occurred while waiting for async. reregistration", t) } } } /** * 獲取塊資料的介面,如果沒有丟擲異常 */ override def getBlockData(blockId: BlockId): ManagedBuffer = { if (blockId.isShuffle) { // shuflle,IndexShuffleBlockResolver shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]) } else { getLocalBytes(blockId) match { case Some(buffer) => new BlockManagerManagedBuffer(blockInfoManager, blockId, buffer) case None => throw new BlockNotFoundException(blockId.toString) } } } /** * 使用給定的儲存級別儲存資料 */ override def putBlockData( blockId: BlockId, data: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Boolean = { putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) } /** * 獲取給定的id塊的狀態 */ def getStatus(blockId: BlockId): Option[BlockStatus] = { blockInfoManager.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L BlockStatus(info.level, memSize = memSize, diskSize = diskSize) } } /** * 根據條件過濾出滿足條件的block id,這個方法也會查詢在diskblockmanager中的內容 */ def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = { // The `toArray` is necessary here in order to force the list to be materialized so that we // don't try to serialize a lazy iterator when responding to client requests. (blockInfoManager.entries.map(_._1) ++ diskBlockManager.getAllBlocks()) .filter(filter) .toArray .toSeq } /** * 向blockmanagermaster上報這個資料塊的儲存狀態,這個方法將會發送一個反應block當前狀態的更新訊息,而不是這個塊資訊中所需的儲存級別。比如一個block設定了儲存級別MEMORY_AND_DISK,但是現在可能現在只儲存在磁碟上。 droppedMemorySize這個引數存在時,可以將資料從記憶體重新整理到磁碟。 */ private def reportBlockStatus( blockId: BlockId, info: BlockInfo, status: BlockStatus, droppedMemorySize: Long = 0L): Unit = { val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize) if (needReregister) { logInfo(s"Got told to re-register updating block $blockId") // Re-registering will report our new block for free. asyncReregister() } logDebug(s"Told master about block $blockId") } /** * 實際上傳送了UpdateBlockInfo這個訊息,接收返回blockmanagermaster的回覆,如果成功記錄了這條記錄返回true,需要重新註冊返回false */ private def tryToReportBlockStatus( blockId: BlockId, info: BlockInfo, status: BlockStatus, droppedMemorySize: Long = 0L): Boolean = { if (info.tellMaster) { val storageLevel = status.storageLevel val inMemSize = Math.max(status.memSize, droppedMemorySize) val onDiskSize = status.diskSize master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize) } else { true } } /** * 返回給定資料塊的儲存狀態。如果資料已經重新整理到磁碟,返回最新的儲存級別並更新記憶體和磁碟的資料大小 */ private def getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus = { info.synchronized { info.level match { case null => BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false val replication = if (inMem || onDisk) level.replication else 1 val storageLevel = StorageLevel( useDisk = onDisk, useMemory = inMem, useOffHeap = level.useOffHeap, deserialized = deserialized, replication = replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L BlockStatus(storageLevel, memSize, diskSize) } } } /** * 獲取block的位置 */ private def getLocationBlockIds(blockIds: Array[BlockId]): Array[Seq[BlockManagerId]] = { val startTimeMs = System.currentTimeMillis val locations = master.getLocations(blockIds).toArray logDebug("Got multiple block location in %s".format(Utils.getUsedTimeMs(startTimeMs))) locations } /** * 本地讀取失敗時執行清理程式碼 * 移除block時必須獲得block的讀鎖 */ private def handleLocalReadFailure(blockId: BlockId): Nothing = { // 釋放鎖 releaseLock(blockId) // 移除丟失的block removeBlock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") } /** * 從blockmanager中獲取block */ def getLocalValues(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting local block $blockId") blockInfoManager.lockForReading(blockId) match { case None => logDebug(s"Block $blockId was not found") None case Some(info) => val level = info.level logDebug(s"Level for block $blockId is $level") // 如果還在記憶體中 if (level.useMemory && memoryStore.contains(blockId)) { val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { serializerManager.dataDeserializeStream( blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { // 已經重新整理到磁碟 val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { val diskValues = serializerManager.dataDeserializeStream( blockId, diskBytes.toInputStream(dispose = true))(info.classTag) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) .map {_.toInputStream(dispose = false)} .getOrElse { diskBytes.toInputStream(dispose = true) } serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { // 記憶體和磁碟都沒有讀取失敗 handleLocalReadFailure(blockId) } } } /** * 從本地blockmanager中獲取塊的序列化位元組 */ def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { logDebug(s"Getting local block $blockId as bytes") // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. Option( new ChunkedByteBuffer( shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())) } else { blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) } } } /** * 從本地blockmanager中獲取塊的序列化位元組 * * 呼叫這個方法時必須獲取這個block的讀鎖,持有鎖到執行成功,如果出現異常釋放鎖 */ private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = { val level = info.level logDebug(s"Level for block $blockId is $level") // 按序讀取塊,首先從記憶體中,然後磁碟中,然後退回到序列化記憶體物件中,如果塊不存在,丟擲異常 if (level.deserialized) { // 從磁碟中讀取反序列化的副本 if (level.useDisk && diskStore.contains(blockId)) { diskStore.getBytes(blockId) } else if (level.useMemory && memoryStore.contains(blockId)) { // 如果從磁碟中沒有找到,到記憶體中 serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get) } else { handleLocalReadFailure(blockId) } } else { // storage level is serialized if (level.useMemory && memoryStore.contains(blockId)) { memoryStore.getBytes(blockId).get } else if (level.useDisk && diskStore.contains(blockId)) { val diskBytes = diskStore.getBytes(blockId) maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes) } else { handleLocalReadFailure(blockId) } } } /** * 從遠端blockmanager獲取block */ private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => val values = serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) new BlockResult(values, DataReadMethod.Network, data.size) } } /** * 返回給定block的地址列表,優先處理本地的 */ private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { val locs = Random.shuffle(master.getLocations(blockId)) val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } preferredLocs ++ otherLocs } /** * 用遠端的blcokmanager中以序列化位元組方式獲取block資料 */ def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = { logDebug(s"Getting remote block $blockId") require(blockId != null, "BlockId is null") var runningFailureCount = 0 var totalFailureCount = 0 val locations = getLocations(blockId) val maxFetchFailures = locations.size var locationIterator = locations.iterator while (locationIterator.hasNext) { val loc = locationIterator.next() logDebug(s"Getting remote block $blockId from $loc") val data = try { blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() } catch { case NonFatal(e) => runningFailureCount += 1 totalFailureCount += 1 if (totalFailureCount >= maxFetchFailures) { // Give up trying anymore locations. Either we've tried all of the original locations, // or we've refreshed the list of locations from the master, and have still // hit failures after trying locations from the refreshed list. throw new BlockFetchException(s"Failed to fetch block after" + s" ${totalFailureCount} fetch failures. Most recent failure cause:", e) } logWarning(s"Failed to fetch remote block $blockId " + s"from $loc (failed attempt $runningFailureCount)", e) // If there is a large number of executors then locations list can contain a // large number of stale entries causing a large number of retries that may // take a significant amount of time. To get rid of these stale entries // we refresh the block locations after a certain number of fetch failures if (runningFailureCount >= maxFailuresBeforeLocationRefresh) { locationIterator = getLocations(blockId).iterator logDebug(s"Refreshed locations from the driver " + s"after ${runningFailureCount} fetch failures.") runningFailureCount = 0 } // This location failed, so we retry fetch from a different one by returning null here null } if (data != null) { return Some(new ChunkedByteBuffer(data)) } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None } /** * 從blockmanager獲取block 如果塊儲存在本地,那麼會獲取block的讀鎖;如果從遠端blockmananger中獲取,則不用獲取讀鎖, */ def get(blockId: BlockId): Option[BlockResult] = { val local = getLocalValues(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") return local } val remote = getRemoteValues(blockId) if (remote.isDefined) { logInfo(s"Found block $blockId remotely") return remote } None } /** * Downgrades an exclusive write lock to a shared read lock. */ def downgradeLock(blockId: BlockId): Unit = { blockInfoManager.downgradeLock(blockId) } /** * Release a lock on the given block. */ def releaseLock(blockId: BlockId): Unit = { blockInfoManager.unlock(blockId) } /** * 向blockmanager註冊task */ def registerTask(taskAttemptId: Long): Unit = { blockInfoManager.registerTask(taskAttemptId) } /** * Release all locks for the given task. * * @return the blocks whose locks were released. */ def releaseAllLocksForTask(taskAttemptId: Long): Seq[BlockId] = { blockInfoManager.releaseAllLocksForTask(taskAttemptId) } /** * 檢索給定的block,如果不存在就呼叫makeIterator這個方法計算block並快取返回這個block */ def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { // Attempt to read the block from local or remote storage. If it's present, then we don't need // to go through the local-get-or-put path. get(blockId) match { case Some(block) => return Left(block) case _ => // Need to compute the block. } // Initially we hold no locks on this block. doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => // doPut() didn't hand work back to us, so the block already existed or was successfully // stored. Therefore, we now hold a read lock on the block. val blockResult = getLocalValues(blockId).getOrElse { // Since we held a read lock between the doPut() and get() calls, the block should not // have been evicted, so get() not returning the block indicates some internal error. releaseLock(blockId) throw new SparkException(s"get() failed for block $blockId even though we held a lock") } // We already hold a read lock on the block from the doPut() call and getLocalValues() // acquires the lock again, so we need to call releaseLock() here so that the net number // of lock acquisitions is 1 (since the caller will only call release() once). releaseLock(blockId) Left(blockResult) case Some(iter) => // The put failed, likely because the data was too large to fit in memory and could not be // dropped to disk. Therefore, we need to pass the input iterator back to the caller so // that they can decide what to do with the values (e.g. process them without caching). Right(iter) } } /** * @return true if the block was stored or false if an error occurred. */ def putIterator[T: ClassTag]( blockId: BlockId, values: Iterator[T], level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(values != null, "Values is null") doPutIterator(blockId, () => values, level, implicitly[ClassTag[T]], tellMaster) match { case None => true case Some(iter) => // Caller doesn't care about the iterator values, so we can close the iterator here // to free resources earlier iter.close() false } } /** * 獲取block的寫入流,將資料直接寫入到磁碟 */ def getDiskWriter( blockId: BlockId, file: File, serializerInstance: SerializerInstance, bufferSize: Int, writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = { val compressStream: OutputStream => OutputStream = serializerManager.wrapForCompression(blockId, _) val syncWrites = conf.getBoolean("spark.shuffle.sync", false) new DiskBlockObjectWriter(file, serializerInstance, bufferSize, compressStream, syncWrites, writeMetrics, blockId) } /** * 將新序列化的位元組寫入到blockmanager */ def putBytes[T: ClassTag]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster) } /** * 根據給定的級別將給定的位元組寫入塊中,如果塊已經存在,資料不會覆蓋 */ private def doPutBytes[T]( blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Boolean = { doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis // 因為儲存位元組,在本地儲存之前需要初始化副本 // This is faster as data is already serialized and ready to send. val replicationFuture = if (level.replication > 1) { Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool replicate(blockId, bytes, level, classTag) }(futureExecutionContext) } else { null } val size = bytes.size if (level.useMemory) { // 先儲存到記憶體中 val putSucceeded = if (level.deserialized) { val values = serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag) memoryStore.putIteratorAsValues(blockId, values, classTag) match { case Right(_) => true case Left(iter) => // If putting deserialized values in memory failed, we will put the bytes directly to // disk, so we don't need this iterator and can close it to free resources earlier. iter.close() false } } else { memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes) } if (!putSucceeded && level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.putBytes(blockId, bytes) } } else if (level.useDisk) { diskStore.putBytes(blockId, bytes) } val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. info.size = size if (tellMaster) { reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) } } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { // Wait for asynchronous replication to finish try { Await.ready(replicationFuture, Duration.Inf) } catch { case NonFatal(t) => throw new Exception("Error occurred while waiting for replication to finish", t) } } if (blockWasSuccessfullyStored) { None } else { Some(bytes) } }.isEmpty } /** * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]]. * * @param putBody a function which attempts the actual put() and returns None on success * or Some on failure. */ private def doPut[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[_], tellMaster: Boolean, keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { newInfo } else { logWarning(s"Block $blockId already exists on this machine; not re-adding it") if (!keepReadLock) { // lockNewBlockForWriting returned a read lock on the existing block, so we must free it: releaseLock(blockId) } return None } } val startTimeMs = System.currentTimeMillis var blockWasSuccessfullyStored: Boolean = false val result: Option[T] = try { val res = putBody(putBlockInfo) blockWasSuccessfullyStored = res.isEmpty res } finally { if (blockWasSuccessfullyStored) { if (keepReadLock) { blockInfoManager.downgradeLock(blockId) } else { blockInfoManager.unlock(blockId) } } else { blockInfoManager.removeBlock(blockId) logWarning(s"Putting block $blockId failed") } } if (level.replication > 1) { logDebug("Putting block %s with replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } else { logDebug("Putting block %s without replication took %s" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } result } /** * Put the given block according to the given level in one of the block stores, replicating * the values if necessary. * * If the block already exists, this method will not overwrite it. * * @param keepReadLock if true, this method will hold the read lock when it returns (even if the * block already exists). If false, this method will hold no locks when it * returns. * @return None if the block was already present or if the put succeeded, or Some(iterator) * if the put failed. */ private def doPutIterator[T]( blockId: BlockId, iterator: () => Iterator[T], level: StorageLevel, classTag: ClassTag[T], tellMaster: Boolean = true, keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = { doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => val startTimeMs = System.currentTimeMillis var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None // Size of the block in bytes var size = 0L if (level.useMemory) { // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. if (level.deserialized) { memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match { case Right(s) => size = s case Left(iter) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(iter) } } } else { // !level.deserialized memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match { case Right(s) => size = s case Left(partiallySerializedValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") diskStore.put(blockId) { fileOutputStream => partiallySerializedValues.finishWritingToStream(fileOutputStream) } size = diskStore.getSize(blockId) } else { iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator) } } } } else if (level.useDisk) { diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag) } size = diskStore.getSize(blockId) } val putBlockStatus = getCurrentBlockStatus(blockId, info) val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid if (blockWasSuccessfullyStored) { // Now that the block is in either the memory, externalBlockStore, or disk store, // tell the master about it. info.size = size if (tellMaster) { reportBlockStatus(blockId, info, putBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus) } logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs))) if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis val bytesToReplicate = doGetLocalBytes(blockId, info) try { replicate(blockId, bytesToReplicate, level, classTag) } finally { bytesToReplicate.dispose() } logDebug("Put block %s remotely took %s" .format(blockId, Utils.getUsedTimeMs(remoteStartTime))) } } assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty) iteratorFromFailedMemoryStorePut } } /** * Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up * subsequent reads. This method requires the caller to hold a read lock on the block. * * @return a copy of the bytes from the memory store if the put succeeded, otherwise None. * If this returns bytes from the memory store then the original disk store bytes will * automatically be disposed and the caller should not continue to use them. Otherwise, * if this returns None then the original disk store bytes will be unaffected. */ private def maybeCacheDiskBytesInMemory( blockInfo: BlockInfo, blockId: BlockId, level: StorageLevel, diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = { require(!level.deserialized) if (level.useMemory) { // Synchronize on blockInfo to guard against a race condition where two readers both try to // put values read from disk into the MemoryStore. blockInfo.synchronized { if (memoryStore.contains(blockId)) { diskBytes.dispose() Some(memoryStore.getBytes(blockId).get) } else { val allocator = level.memoryMode match { case MemoryMode.ON_HEAP => ByteBuffer.allocate _ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ } val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => { // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we // cannot put it into MemoryStore, copyForMemory should not be created. That's why // this action is put into a `() => ChunkedByteBuffer` and created lazily. diskBytes.copy(allocator) }) if (putSucceeded) { diskBytes.dispose() Some(memoryStore.getBytes(blockId).get) } else { None } } } } else { None } } /** * Attempts to cache spilled values read from disk into the MemoryStore in order to speed up * subsequent reads. This method requires the caller to hold a read lock on the block. * * @return a copy of the iterator. The original iterator passed this method should no longer * be used after this method returns. */ private def maybeCacheDiskValuesInMemory[T]( blockInfo: BlockInfo, blockId: BlockId, level: StorageLevel, diskIterator: Iterator[T]): Iterator[T] = { require(level.deserialized) val classTag = blockInfo.classTag.asInstanceOf[ClassTag[T]] if (level.useMemory) { // Synchronize on blockInfo to guard against a race condition where two readers both try to // put values read from disk into the MemoryStore. blockInfo.synchronized { if (memoryStore.contains(blockId)) { // Note: if we had a means to discard the disk iterator, we would do that here. memoryStore.getValues(blockId).get } else { memoryStore.putIteratorAsValues(blockId, diskIterator, classTag) match { case Left(iter) => // The memory store put() failed, so it returned the iterator back to us: iter case Right(_) => // The put() succeeded, so we can read the values back: memoryStore.getValues(blockId).get } } }.asInstanceOf[Iterator[T]] } else { diskIterator } } /** * Get peer block managers in the system. */ private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { peerFetchLock.synchronized { val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl if (cachedPeers == null || forceFetch || timeout) { cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) lastPeerFetchTime = System.currentTimeMillis logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]")) } cachedPeers } } /** * Replicate block to another node. Not that this is a blocking call that returns after * the block has been replicated. */ private def replicate( blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel, classTag: ClassTag[_]): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 val peersForReplication = new ArrayBuffer[BlockManagerId] val peersReplicatedTo = new ArrayBuffer[BlockManagerId] val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( useDisk = level.useDisk, useMemory = level.useMemory, useOffHeap = level.useOffHeap, deserialized = level.deserialized, replication = 1) val startTime = System.currentTimeMillis val random = new Random(blockId.hashCode) var replicationFailed = false var failures = 0 var done = false // Get cached list of peers peersForReplication ++= getPeers(forceFetch = false) // Get a random peer. Note that this selection of a peer is deterministic on the block id. // So assuming the list of peers does not change and no replication failures, // if there are multiple attempts in the same node to replicate the same block, // the same set of peers will be selected. def getRandomPeer(): Option[BlockManagerId] = { // If replication had failed, then force update the cached list of peers and remove the peers // that have been already used if (replicationFailed) { peersForReplication.clear() peersForReplication ++= getPeers(forceFetch = true) peersForReplication --= peersReplicatedTo peersForReplication --= peersFailedToReplicateTo } if (!peersForReplication.isEmpty) { Some(peersForReplication(random.nextInt(peersForReplication.size))) } else { None } } // One by one choose a random peer and try uploading the block to it // If replication fails (e.g., target peer is down), force the list of cached peers // to be re-fetched from driver and then pick another random peer for replication. Also // temporarily black list the peer for which replication failed. // // This selection of a peer and replication is continued in a loop until one of the // following 3 conditions is fulfilled: // (i) specified number of peers have been replicated to // (ii) too many failures in replicating to peers // (iii) no peer left to replicate to // while (!done) { getRandomPeer() match { case Some(peer) => try { val onePeerStartTime = System.currentTimeMillis logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") blockTransferService.uploadBlockSync( peer.host, peer.port, peer.executorId, blockId, new NettyManagedBuffer(data.toNetty), tLevel, classTag) logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms" .format(System.currentTimeMillis - onePeerStartTime)) peersReplicatedTo += peer peersForReplication -= peer replicationFailed = false if (peersReplicatedTo.size == numPeersToReplicateTo) { done = true // specified number of peers have been replicated to } } catch { case e: Exception => logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e) failures += 1 replicationFailed = true peersFailedToReplicateTo += peer if (failures > maxReplicationFailures) { // too many failures in replicating to peers done = true } } case None => // no peer left to replicate to done = true } } val timeTakeMs = (System.currentTimeMillis - startTime) logDebug(s"Replicating $blockId of ${data.size} bytes to " + s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms") if (peersReplicatedTo.size < numPeersToReplicateTo) { logWarning(s"Block $blockId replicated to only " + s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers") } } /** * Read a block consisting of a single object. */ def getSingle(blockId: BlockId): Option[Any] = { get(blockId).map(_.data.next()) } /** * Write a block consisting of a single object. * * @return true if the block was stored or false if the block was already stored or an * error occurred. */ def putSingle[T: ClassTag]( blockId: BlockId, value: T, level: StorageLevel, tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) } /** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. * * If `data` is not put on disk, it won't be created. * * The caller of this method must hold a write lock on the block before calling this method. * This method does not release the write lock. * * @return the block's new effective StorageLevel. */ private[storage] override def dropFromMemory[T: ClassTag]( blockId: BlockId, data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { logInfo(s"Dropping block $blockId from memory") val info = blockInfoManager.assertBlockIsLockedForWriting(blockId) var blockIsUpdated = false val level = info.level // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") data() match { case Left(elements) => diskStore.put(blockId) { fileOutputStream => serializerManager.dataSerializeStream( blockId, fileOutputStream, elements.toIterator)(info.classTag.asInstanceOf[ClassTag[T]]) } case Right(bytes) => diskStore.putBytes(blockId, bytes) } blockIsUpdated = true } // Actually drop from memory store val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockIsRemoved = memoryStore.remove(blockId) if (blockIsRemoved) { blockIsUpdated = true } else { logWarning(s"Block $blockId could not be dropped from memory as it does not exist") } val status = getCurrentBlockStatus(blockId, info) if (info.tellMaster) { reportBlockStatus(blockId, info, status, droppedMemorySize) } if (blockIsUpdated) { Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> status) } } status.storageLevel } /** * Remove all blocks belonging to the given RDD. * * @return The number of blocks removed. */ def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. logInfo(s"Removing RDD $rddId") val blocksToRemove = blockInfoManager.entries.flatMap(_._1.asRDDId).filter(_.rddId == rddId) blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.size } /** * Remove all blocks belonging to the given broadcast. */ def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = { logDebug(s"Removing broadcast $broadcastId") val blocksToRemove = blockInfoManager.entries.map(_._1).collect { case bid @ BroadcastBlockId(`broadcastId`, _) => bid } blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) } blocksToRemove.size } /** * Remove a block from both memory and disk. */ def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") blockInfoManager.lockForWriting(blockId) match { case None => // The block has already been removed; do nothing. logWarning(s"Asked to remove block $blockId, which does not exist") case Some(info) => // Removals are idempotent in disk store and memory store. At worst, we get a warning. val removedFromMemory = memoryStore.remove(blockId) val removedFromDisk = diskStore.remove(blockId) if (!removedFromMemory && !removedFromDisk) { logWarning(s"Block $blockId could not be removed as it was not found in either " + "the disk, memory, or external block store") } blockInfoManager.removeBlock(blockId) val removeBlockStatus = getCurrentBlockStatus(blockId, info) if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, info, removeBlockStatus) } Option(TaskContext.get()).foreach { c => c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus) } } } def stop(): Unit = { blockTransferService.close() if (shuffleClient ne blockTransferService) { // Closing should be idempotent, but maybe not for the NioBlockTransferService. shuffleClient.close() } diskBlockManager.stop() rpcEnv.stop(slaveEndpoint) blockInfoManager.clear() memoryStore.clear() futureExecutionContext.shutdownNow() logInfo("BlockManager stopped") } } private[spark] object BlockManager { private val ID_GENERATOR = new IdGenerator def blockIdsToHosts( blockIds: Array[BlockId], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[String]] = { // blockManagerMaster != null is used in tests assert(env != null || blockManagerMaster != null) val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { env.blockManager.getLocationBlockIds(blockIds) } else { blockManagerMaster.getLocations(blockIds) } val blockManagers = new HashMap[BlockId, Seq[String]] for (i <- 0 until blockIds.length) { blockManagers(blockIds(i)) = blockLocations(i).map(_.host) } blockManagers.toMap } }