Spark BlockManager原理與原始碼分析
1、BlockManager原理示意圖
①Driver上的BlockManagerMaster管理各個節點上BlockManager的元資料資訊和維護block的狀態資訊。
②每個節點上BlockManager的每個元件:
DiskStore:負責磁碟上的資料讀寫
MemoryStore: 負責記憶體中的資料讀寫
BlockManagerWorker: 負責遠端節點的資料讀寫
ConnectionMaster:負責建立遠端BlockManager的通訊連線
③BlockManager在進行資料的讀寫操作時,如RDD的執行中呼叫了presist()或中間生成一些資料,優先存入記憶體,記憶體儲存不下,就儲存到磁碟中
④Shuffle的讀資料操作,從本地記憶體(MemoryStore)和磁碟(DiskStore)中讀取資料,如果沒有就從其他節點上使用ConnectionMaster建立連線,使用BlockManagerWorker下載資料
2、原始碼分析
①BlockManager的註冊與維護
BlockManagerMaster使用BlockManagerMasterEndpoint(Actor)來負責executor和BlockManager的元資料管理
BlockManagerMasterEndpoint.scala
/** * 負責維護各個executor和BlockManager的元資料 BlockManagerInfo、BlockStatus */ private[spark] class BlockManagerMasterEndpoint( override val rpcEnv: RpcEnv, val isLocal: Boolean, conf: SparkConf, listenerBus: LiveListenerBus) extends ThreadSafeRpcEndpoint with Logging { // Mapping from block manager id to the block manager's information. // BlockManagerId-BlockManagerInfo的對映 private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // Mapping from executor ID to block manager ID. // executorId - blockManagerId對映 每個executor是和一個BlockManager關聯的 private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] ... }
註冊BlockManagerInfo
// 註冊blockManager private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) { val time = System.currentTimeMillis() // 判斷是否註冊過BlocManager if (!blockManagerInfo.contains(id)) { // 根據executorId查詢BlockManagerId blockManagerIdByExecutor.get(id.executorId) match { // 這裡有一個安全判斷,如果BlockManagerInfo map 中沒有BlockManagerId // 那麼對應的blockManagerIdByExecutorId map 也必須沒有 case Some(oldId) => // A block manager of the same executor already exists, so remove it (assumed dead) logError("Got two different block manager registrations on same executor - " + s" will replace old one $oldId with new one $id") // 所以,在這裡做一下清理,移除executorId相關的BlockManagerInfo removeExecutor(id.executorId) case None => } logInfo("Registering block manager %s with %s RAM, %s".format( id.hostPort, Utils.bytesToString(maxMemSize), id)) // 儲存一份executorId到BlockManagerId的對映 blockManagerIdByExecutor(id.executorId) = id // 為BlockManagerId建立一個BlockManagerInfo //並儲存一份BlockManagerId到BlockManagerInfo的對映 blockManagerInfo(id) = new BlockManagerInfo( id, System.currentTimeMillis(), maxMemSize, slaveEndpoint) } listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) }
更新BlockManagerInfo
/ 更新blockInfo, 即每個BlockManager上的block發生了變化
// 都要傳送updateBlockInfo請求,到BlockManagerMaster對BlockInfo進行更新
private def updateBlockInfo(
blockManagerId: BlockManagerId,
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
return true
} else {
return false
}
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
return true
}
blockManagerInfo(blockManagerId).updateBlockInfo(
blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
// 每一個block可能會在多個BlockManager上
// 根據block的儲存級別StoreLevel,設定為_2的,就需要將block 備份到其他BlockManager上
// location map 儲存了每個blockId的對應的BlockManagerId集合
// 因為使用的是set儲存,所以自動去重
var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
locations = blockLocations.get(blockId)
} else {
locations = new mutable.HashSet[BlockManagerId]
blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
locations.add(blockManagerId)
} else {
locations.remove(blockManagerId)
}
// Remove the block from master tracking if it has been removed on all slaves.
if (locations.size == 0) {
blockLocations.remove(blockId)
}
true
}
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxMem: Long,
val slaveEndpoint: RpcEndpointRef)
extends Logging {
...
// Mapping from block id to its status.
// blockId-BlockStatus的對映
private val _blocks = new JHashMap[BlockId, BlockStatus]
...
def updateBlockInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long) {
updateLastSeenMs()
//判斷內部是否有block
if (_blocks.containsKey(blockId)) {
// The block exists on the slave already.
val blockStatus: BlockStatus = _blocks.get(blockId)
val originalLevel: StorageLevel = blockStatus.storageLevel
val originalMemSize: Long = blockStatus.memSize
// 判斷storeLevel是否使用記憶體,是就給剩餘記憶體數量加上當前記憶體數量
if (originalLevel.useMemory) {
_remainingMem += originalMemSize
}
}
// 給block建立一個BlockStatus,然後根據持久化級別,對相應的記憶體資源進行計算
if (storageLevel.isValid) {
/* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
* The memSize here indicates the data size in or dropped from memory,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
* They can be both larger than 0, when a block is dropped from memory to disk.
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
_blocks.put(blockId, blockStatus)
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useOffHeap) {
blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
_blocks.put(blockId, blockStatus)
logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
}
②BlockManager和 BlockManager之間的資料傳輸
BlockManager.scala
初始化元件
private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with Logging {
val diskBlockManager = new DiskBlockManager(this, conf)
// 每個BlockManager,自己維護了一個map blockId-blockInfo的對映
// blockInfo就代表著一份block,其最大作用是作為多執行緒訪問同一個block的同步監視器
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
private[spark] val memoryStore = new MemoryStore(this, memoryManager)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
def initialize(appId: String): Unit = {
// 初始化 ,blockTransferService用於遠端block資料傳輸
blockTransferService.init(this)
shuffleClient.init(appId)
// 為blockManager建立一個對應的BlockManagerId
// 一個BlockManager是通過一個節點上的Executor來唯一標識的
blockManagerId = BlockManagerId(
executorId, blockTransferService.hostName, blockTransferService.port)
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
// 使用BlockManagerMasterEndpoint的引用,進行BlockManager的註冊
// 傳送訊息到BlockManagerMasterEndpoint上
master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}
}
...
(1) 從本地讀取資料
BlockManager.scala
// 從本地獲取資料
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
// 嘗試獲取block的對應blockInfo的鎖
val info = blockInfo.get(blockId).orNull
if (info != null) {
//對所有的BlockInfo,都會進行多執行緒同步訪問
// blockInfo相當於是對block,多執行緒併發訪問的監視器
info.synchronized {
// Double check to make sure the block is still there. There is a small chance that the
// block has been removed by removeBlock (which also synchronizes on the blockInfo object).
// Note that this only checks metadata tracking. If user intentionally deleted the block
// on disk or from off heap storage without using removeBlock, this conditional check will
// still pass but eventually we will get an exception because we can't find the block.
if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
// If another thread is writing the block, wait for it to become ready.
// 如果其他執行緒在操作當前需要訪問的block,就會等待獲取BlockInfo的排它鎖
// 如果始終沒有獲取到,就返回
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure.")
return None
}
val level = info.level
logDebug(s"Level for block $blockId is $level")
// Look for the block in memory
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asBlockResult) {
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
}
// Look for the block in external block store
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
// Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(b) => b
case None =>
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
assert(0 == bytes.position())
if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
if (asBlockResult) {
// 反序列化
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
info.size))
} else {
return Some(bytes)
}
} else {
// Otherwise, we also have to store something in the memory store
if (!level.deserialized || !asBlockResult) {
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
// action is put into a `() => ByteBuffer` and created lazily.
// 如果即使用了Disk級別,又使用了memory級別,就從disk中讀取出來後,
// 嘗試放入記憶體中
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
// The put may or may not have succeeded, depending on whether there was enough
// space to unroll the block. Either way, the put here should return an iterator.
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
case _ =>
// This only happens if we dropped the values back to disk (which is never)
throw new SparkException("Memory store did not return an iterator!")
}
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
}
}
} else {
logDebug(s"Block $blockId not registered locally")
}
None
}
MemoryStore.scala
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
// 多執行緒併發訪問同步的
val entry = entries.synchronized {
// 嘗試從記憶體中獲取資料
entries.get(blockId)
}
if (entry == null) {
// 沒有獲取到就返回null
None
} else if (entry.deserialized) {
// 獲取到的是非序列化資料,將其序列化後返回
Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
} else {
// 序列化資料,直接返回
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
}
getValues()方法與getBytes()方法相反,需要拿到的是文字資料
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
None
} else if (entry.deserialized) {
Some(entry.value.asInstanceOf[Array[Any]].iterator)
} else {
// 序列化的資料,反序列化
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
}
}
DiskStore.scala
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
// 使用java的 nio進行檔案的讀寫操作
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(length.toInt)
channel.position(offset)
while (buf.remaining() != 0) {
if (channel.read(buf) == -1) {
throw new IOException("Reached EOF before filling buffer\n" +
s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
}
}
buf.flip()
Some(buf)
} else {
Some(channel.map(MapMode.READ_ONLY, offset, length))
}
} {
channel.close()
}
}
override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}
總結:
①先從記憶體中讀取資料,再從磁碟中讀取
②如果讀取的資料使用了記憶體,又使用了磁碟,將從磁碟中讀取的資料寫入到記憶體
③資料的讀取過程使用了多執行緒同步訪問,保證資料讀取的安全
(2)遠端讀取資料
BlockManager.scala
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
// 從BlockManagerMaster上,獲取blockId對應的BlockManager資訊
// 然後隨機打亂
val locations = Random.shuffle(master.getLocations(blockId))
var numFetchFailures = 0
// 遍歷BlockManager
for (loc <- locations) {
logDebug(s"Getting remote block $blockId from $loc")
// 使用blockTransferService,進行非同步的遠端網路獲取block資料
val data = try {
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
} catch {
case NonFatal(e) =>
numFetchFailures += 1
if (numFetchFailures == locations.size) {
// An exception is thrown while fetching this block from all locations
throw new BlockFetchException(s"Failed to fetch block from" +
s" ${locations.size} locations. Most recent failure cause:", e)
} else {
// This location failed, so we retry fetch from a different one by returning null here
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $numFetchFailures)", e)
null
}
}
if (data != null) {
if (asBlockResult) {
return Some(new BlockResult(
// 反序列化
dataDeserialize(blockId, data),
DataReadMethod.Network,
data.limit()))
} else {
return Some(data)
}
}
logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}
(3) 寫資料
BlockManager.scala
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
effectiveStorageLevel.foreach { level =>
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
}
// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
/* Remember the block's storage level so that we can correctly drop it to disk if it needs
* to be dropped right after it got put into memory. Note, however, that other threads will
* not be able to get() this block until we call markReady on its BlockInfo. */
// 為要寫入的block,建立一個BlockInfo,並放入BlockInfo map中
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
// TODO: So the block info exists - but previous attempt to load it (?) failed.
// What do we do now ? Retry on it ?
oldBlockOpt.get
} else {
tinfo
}
}
val startTimeMs = System.currentTimeMillis
/* If we're storing values and we need to replicate the data, we'll want access to the values,
* but because our put will read the whole iterator, there will be no values left. For the
* case where the put serializes data, we'll remember the bytes, above; but for the case where
* it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */
var valuesAfterPut: Iterator[Any] = null
// Ditto for the bytes after the put
var bytesAfterPut: ByteBuffer = null
// Size of the block in bytes
var size = 0L
// The level we actually use to put the block
val putLevel = effectiveStorageLevel.getOrElse(level)
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
// 對BlockInfo加鎖,進行多執行緒併發訪問同步
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
var marked = false
try {
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
// 根據持久化級別,選擇一種BlockStore, MemoryStore, DiskStore等
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
// Actually put the values
// 根據store級別,資料的型別,把資料放入store中
val result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}
// Keep track of which blocks are dropped from memory
if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}
// 獲取到一個Block對應的BlockStatus
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
// 將新寫入的block資料,傳送給BlockManagerMasterEndpoint
// 進行block元資料的同步和維護
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
// If we failed in putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
if (!marked) {
// Note that the remove must happen before markFailure otherwise another thread
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
putBlockInfo.markFailure()
logWarning(s"Putting block $blockId failed")
}
}
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
// 持久化級別定義了 _2 級別,需要將block資料,備份到其他節點
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
// 進行資料備份
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
BlockManager.dispose(bytesAfterPut)
if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
updatedBlocks
}
MemoryStore.scala
// 優先寫入記憶體,如果記憶體不足,從記憶體中移除部分舊資料,再將block存入記憶體
private def tryToPut(
blockId: BlockId,
value: () => Any,
size: Long,
deserialized: Boolean,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
/* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
* to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
* been released, it must be ensured that those to-be-dropped blocks are not double counted
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
memoryManager.synchronized {
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
// The synchronization on `memoryManager` here guarantees that the release and acquire
// happen atomically. This relies on the assumption that all memory acquisitions are
// synchronized on the same lock.
releasePendingUnrollMemoryForThisTask()
//判斷記憶體是否存夠放入資料
val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
if (enoughMemory) {
// 記憶體足夠
// We acquired enough memory for the block, so go ahead and put it
val entry = new MemoryEntry(value(), size, deserialized)
// 同步entries
entries.synchronized {
entries.put(blockId, entry)
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
lazy val data = if (deserialized) {
Left(value().asInstanceOf[Array[Any]])
} else {
Right(value().asInstanceOf[ByteBuffer].duplicate())
}
// 如果block允許磁碟儲存,就從BlockManager溢位一部分資料,如果不允許持久化到磁碟,資料就丟失了
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
enoughMemory
}
}
DiskStore.scala
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel
Utils.tryWithSafeFinally {
while (bytes.remaining > 0) {
channel.write(bytes)
}
} {
channel.close()
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
資料在其他節點的BlockManager上備份
BlockManager.scala
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
val numPeersToReplicateTo = level.replication - 1
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)
var replicationFailed = false
var failures = 0
var done = false
// Get cached list of peers
peersForReplication ++= getPeers(forceFetch = false)
// Get a random peer. Note that this selection of a peer is deterministic on the block id.
// So assuming the list of peers does not change and no replication failures,
// if there are multiple attempts in the same node to replicate the same block,
// the same set of peers will be selected.
// 隨機獲取一個BlockManager
def getRandomPeer(): Option[BlockManagerId] = {
// If replication had failed, then force update the cached list of peers and remove the peers
// that have been already used
if (replicationFailed) {
peersForReplication.clear()
peersForReplication ++= getPeers(forceFetch = true)
peersForReplication --= peersReplicatedTo
peersForReplication --= peersFailedToReplicateTo
}
if (!peersForReplication.isEmpty) {
Some(peersForReplication(random.nextInt(peersForReplication.size)))
} else {
None
}
}
// One by one choose a random peer and try uploading the block to it
// If replication fails (e.g., target peer is down), force the list of cached peers
// to be re-fetched from driver and then pick another random peer for replication. Also
// temporarily black list the peer for which replication failed.
//
// This selection of a peer and replication is continued in a loop until one of the
// following 3 conditions is fulfilled:
// (i) specified number of peers have been replicated to
// (ii) too many failures in replicating to peers
// (iii) no peer left to replicate to
//
while (!done) {
getRandomPeer() match {
case Some(peer) =>
try {
val onePeerStartTime = System.currentTimeMillis
data.rewind()
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
// 使用blockTransferService非同步將資料寫入其他的BlockManager上
blockTransferService.uploadBlockSync(
peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
.format(System.currentTimeMillis - onePeerStartTime))
peersReplicatedTo += peer
peersForReplication -= peer
replicationFailed = false
if (peersReplicatedTo.size == numPeersToReplicateTo) {
done = true // specified number of peers have been replicated to
}
} catch {
case e: Exception =>
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
failures += 1
replicationFailed = true
peersFailedToReplicateTo += peer
if (failures > maxReplicationFailures) { // too many failures in replcating to peers
done = true
}
}
case None => // no peer left to replicate to
done = true
}
}
val timeTakeMs = (System.currentTimeMillis - startTime)
logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
}
}
總結:
①寫記憶體不足的處理機制
②寫完以後彙報BlockManagerMaster
③如果要備份隨機挑選一個BlocKManager,使用blockTransformInterface將資料傳輸過去