Spark1.6-----原始碼解讀之BlockManager元件MemoryStore
阿新 • • 發佈:2018-12-16
MemoryStore負責將沒有序列化的java物件陣列或者序列化的ByteBuffer儲存到記憶體中:
MemoryStore記憶體模型
maxUnrollMemory:當前Driver或者Executor的block最多提前佔用的記憶體的大小,每個執行緒都能佔記憶體。(類似上課佔座,人沒到,但是位置有了)
maxMemory:當前Driver或者Executor儲存所能利用最大記憶體大小。
currentMemoey:當前Driver或者Executor以及用了記憶體。
freeMemory:當前Driver或者Executor為使用的記憶體。
currentUnrollMemory:當前Driver或者Executor的block已經提前佔用的記憶體的大小,所有執行緒block已經提前佔用的記憶體的大小之和
unrollMemoryMap:都存入map中 key為執行緒id,value為每個執行緒佔用的記憶體。
private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager) extends BlockStore(blockManager) { // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and // acquiring or releasing unroll memory, must be synchronized on `memoryManager`! private val conf = blockManager.conf private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` private val unrollMemoryMap = mutable.HashMap[Long, Long]() // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. // Pending unroll memory refers to the intermediate memory occupied by a task // after the unroll but before the actual putting of the block in the cache. // This chunk of memory is expected to be released *as soon as* we finish // caching the corresponding block as opposed to until after the task finishes. // This is only used if a block is successfully unrolled in its entirety in // memory (SPARK-4777). private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) /** Total amount of memory available for storage, in bytes. */ private def maxMemory: Long = memoryManager.maxStorageMemory
MemoryStore繼承自BlockStore。實現了getBytes,putBytes,putArray,putIterator,getValues等方法。
資料儲存putBytes
如何Block的儲存級別為能序列化,則先進行序列化再呼叫putIterator,否則呼叫TryPut.
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. val bytes = _bytes.duplicate() bytes.rewind() if (level.deserialized) { val values = blockManager.dataDeserialize(blockId, bytes) putIterator(blockId, values, level, returnValues = true) } else { val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) } }
Iterator寫入方法putIterator
呼叫unrollSafely測試看看能不能去佔用Block塊大小的記憶體,如果返回的資料型別為Left(array Values)說明記憶體能裝下,呼叫putArray寫入記憶體。
返回的為Right(array Values)說明記憶體不足將寫入硬碟或者拋棄。
/**
* Attempt to put the given block in memory store.
*
* There may not be enough space to fully unroll the iterator in memory, in which case we
* optionally drop the values to disk if
* (1) the block's storage level specifies useDisk, and
* (2) `allowPersistToDisk` is true.
*
* One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
* back from disk and attempts to cache it in memory. In this case, we should not persist the
* block back on disk again, as it is already in disk store.
*/
private[storage] def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean,
allowPersistToDisk: Boolean): PutResult = {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
val res = putArray(blockId, arrayValues, level, returnValues)
droppedBlocks ++= res.droppedBlocks
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
PutResult(res.size, res.data, droppedBlocks)
} else {
PutResult(0, Left(iteratorValues), droppedBlocks)
}
}
}
記憶體寫入PutArray
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
if (level.deserialized) {
//估算物件大小
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
//嘗試去寫入記憶體
tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks)
PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
}
}
嘗試寫入記憶體方法tryToPut
private def tryToPut(
blockId: BlockId,
value: () => Any,
size: Long,
deserialized: Boolean,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
/* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
* to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
* been released, it must be ensured that those to-be-dropped blocks are not double counted
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
//多執行緒,必須要鎖住
memoryManager.synchronized {
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
// The synchronization on `memoryManager` here guarantees that the release and acquire
// happen atomically. This relies on the assumption that all memory acquisitions are
// synchronized on the same lock.
releasePendingUnrollMemoryForThisTask()
//在測試一下,看現在記憶體還能不能放下該Block,因為多執行緒緣故,可能剛才滿足現在不滿足條件
val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
if (enoughMemory) {
// We acquired enough memory for the block, so go ahead and put it
val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
//能放下就寫入記憶體
entries.put(blockId, entry)
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
lazy val data = if (deserialized) {
Left(value().asInstanceOf[Array[Any]])
} else {
Right(value().asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
enoughMemory
}
}