spark儲存模組之記憶體儲存--MemeoryStore
MemeoryStore
上一節,我們對BlockManager的主要寫入方法做了一個整理,知道了BlockMananger的主要寫入邏輯,以及對於塊資訊的管理。但是,由於spark的整個儲存模組是在是很龐大,而且很多細節的邏輯錯綜複雜,如果對於每個細節都刨根問底,一來精力有限,二來感覺也沒有太大的必要,當然如果時間允許肯定是越詳細越好,在這裡,我的分析的主要目的是理清儲存模組的重點邏輯,希望能夠提綱契領地把各個模組的脈絡領出來,建立起對spark-core中各模組的整體認知,這樣我們在遇到一些問題的時候就能夠很快地知道應該從何處下手,從哪個具體的模組去找問題。
好了廢話不多說,本節接著上一節。上一篇,我們分析了BlockManager的幾個主要的儲存方法,發現BlockManager主要依靠內部的兩個元件MemoryStore和DiskStore來進行實際的資料寫入和塊的管理。
不過,我還是延續我一貫的風格,從外部對一個類的方法呼叫為切入點分析這個類的作用和邏輯。
所以,我們先來看一下上一節對於MemoryStore的主要的方法呼叫的總結:
memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
memoryStore.putBytes
memoryStore.putIteratorAsValues
這個方法主要是用於儲存級別是非序列化的情況,即直接以java物件的形式將資料存放在jvm堆記憶體上。我們都知道,在jvm堆記憶體上存放大量的物件並不是什麼好事,gc壓力大,擠佔記憶體,可能引起頻繁的gc,但是也有明顯的好處,就是省去了序列化和反序列化耗時,而且直接從堆記憶體取資料顯然比任何其他方式(磁碟和直接記憶體)都要快很多,所以對於記憶體充足且要快取的資料量本省不是很大的情況,這種方式也不失為一種不錯的選擇。
private[storage] def putIteratorAsValues[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { // 用於儲存java物件的容器 val valuesHolder = new DeserializedValuesHolder[T](classTag) putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match { // 儲存成功 case Right(storedSize) => Right(storedSize) // 儲存失敗的情況 case Left(unrollMemoryUsedByThisBlock) => // ValuesHolder內部的陣列和vector會相互轉換 // 資料寫入完成後會將vector中的資料轉移到陣列中 val unrolledIterator = if (valuesHolder.vector != null) { valuesHolder.vector.iterator } else { valuesHolder.arrayValues.toIterator } // 返回寫入一半的迭代器、 // 外部呼叫者一半會選擇關閉這個迭代器以釋放被使用的記憶體 Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, unrolled = unrolledIterator, rest = values)) } }
這個方法的邏輯很簡單,作用也比較單一,主要是對實際儲存方法putIterator的返回結果做處理,如果失敗的話,就封裝一個PartiallyUnrolledIterator返回給外部呼叫這個,呼叫這個一般需要將這個寫入一半的迭代器關閉。
MemoryStore.putIterator
這個方法看似很長,其實邏輯相對簡單,主要做的事就是把資料一條一條往ValuesHolder中寫,並週期性地檢查記憶體,如果記憶體不夠就通過記憶體管理器MemoryManager申請記憶體,每次申請當前記憶體量的1.5倍。
最後,將ValuesHolder中的資料轉移到一個數組中(其實資料在SizeTrackingVector中也是以陣列的形式儲存,只不過SizeTrackingVector物件內部處理陣列還有一些其他的簿記量,更為關鍵的是我們需要將儲存的資料以同一的介面進行包裝,以利於MemoryStore進行同一管理)。最後還有關鍵的一步,就是釋放展開記憶體,重新申請儲存記憶體。
此外,這個過程中有使用到memoryManager,具體的方法呼叫是:
memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
------------------------------分割線------------------------------
private def putIterator[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode,
valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
// 用於資料在記憶體展開的初始的記憶體使用量
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
// 檢查記憶體的頻率,每寫這麼多條資料就會檢查一次是否需要申請額外的記憶體
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory currently reserved by this task for this particular unrolling operation
// 記憶體閾值,開始時等於初始閾值
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
// 記憶體增長因子,每次申請的記憶體是當前記憶體的這個倍數
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
// 當前的塊使用的記憶體大小
var unrollMemoryUsedByThisBlock = 0L
// Request enough memory to begin unrolling
// 首先進行初始的記憶體申請,向MemoryManager申請記憶體
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
// 如果成功申請到記憶體,則累加記錄
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
// Unroll this block safely, checking whether we have exceeded our threshold periodically
// 迴圈將每條資料寫入容器中valuesHolder
while (values.hasNext && keepUnrolling) {
valuesHolder.storeValue(values.next())
// 如果寫入資料的條數達到一個週期,那麼就檢查一下是否需要申請額外的記憶體
if (elementsUnrolled % memoryCheckPeriod == 0) {
// 通過valuesHolder獲取已經寫入的資料的評估大小
// 注意,這裡的資料大小隻是估計值,並不是十分準確
// 具體如何進行估算的可以看valuesHolder內部實現
val currentSize = valuesHolder.estimatedSize()
// If our vector's size has exceeded the threshold, request more memory
// 如果已寫入的資料大小超過了當前閾值
if (currentSize >= memoryThreshold) {
// 這裡每次申請的記憶體量都是不一樣的
// 每次申請的記憶體是當前已使用記憶體的1.5倍(預設)
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
// 記錄累積申請的記憶體量
unrollMemoryUsedByThisBlock += amountToRequest
}
// New threshold is currentSize * memoryGrowthFactor
// 目前已經向記憶體管理器申請的記憶體量
memoryThreshold += amountToRequest
}
}
// 記錄插入的資料條數
elementsUnrolled += 1
}
// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
// 如果keepUnrolling為true,說明順利地將所有資料插入,
// 並未遇到申請記憶體失敗的情況
if (keepUnrolling) {
// 將內部的資料轉移到一個數組中
val entryBuilder = valuesHolder.getBuilder()
// 資料在記憶體中的精確大小
val size = entryBuilder.preciseSize
// 實際的大小可能大於申請的記憶體量
// 因此根據實際大小還要再申請額外的記憶體
if (size > unrollMemoryUsedByThisBlock) {
val amountToRequest = size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
if (keepUnrolling) {
// 獲取MemoryEntry物件,該物件是對插入資料的包裝
val entry = entryBuilder.build()
// Synchronize so that transfer is atomic
memoryManager.synchronized {
// 這一步主要是釋放申請的展開記憶體
// 然後申請儲存記憶體
// 這裡需要弄清楚展開記憶體的概念
// 展開狀態指的是物件在記憶體中處於一種比較鬆散的狀態,這樣的狀態方便做一些管理如統計大小等
// 而隨後將物件轉移到陣列中,處於一種比較緊實的狀態,陣列相對來說佔用的額外記憶體是比較小的
// 一個數組只是一個物件,只有一個物件頭,可以用來管理大量的物件
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
// 申請儲存記憶體
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
// 放入map中管理起來
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(entry.size)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
// 如果失敗,返回已經申請的展開記憶體
Left(unrollMemoryUsedByThisBlock)
}
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
Left(unrollMemoryUsedByThisBlock)
}
}
memoryStore.putIteratorAsBytes
我們再看另一個方法。套路基本和putIteratorAsValues是一樣一樣的。
最大的區別在於ValuesHolder型別不同。非序列化形式儲存使用的是DeserializedMemoryEntry,而序列化形式儲存使用的是SerializedMemoryEntry。
private[storage] def putIteratorAsBytes[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T],
memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// 位元組陣列的塊大小,預設是1m
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
s"${Utils.bytesToString(Int.MaxValue)}")
Int.MaxValue
} else {
initialMemoryThreshold.toInt
}
// 位元組陣列的容器
val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
memoryMode, serializerManager)
putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
case Right(storedSize) => Right(storedSize)
case Left(unrollMemoryUsedByThisBlock) =>
// 部分展開,部分以序列化形式儲存的block
Left(new PartiallySerializedBlock(
this,
serializerManager,
blockId,
valuesHolder.serializationStream,
valuesHolder.redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
valuesHolder.bbos,
values,
classTag))
}
}
memoryStore.putBytes
我們再來看另一個被外部呼叫用來插入資料的方法。很簡單,不說了。
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// 首先向記憶體管理器申請記憶體
// 這裡申請的是儲存記憶體,因為要插入的位元組陣列,
// 所以不需要再展開,也就不需要申請展開記憶體
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
// 這裡直接構建了一個SerializedMemoryEntry
// 並放到map中管理起來
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
true
} else {
false
}
}
小結
通過對上面的三個方法,其實主要是前兩個方法的分析,我們發現,除了對記憶體進行簿記管理之外,以及通過記憶體管理器申請記憶體之外,插入資料最主要的工作其實都是有ValuesHolder物件來完成的。
ValuesHolder特質有兩個實現類:DeserializedValuesHolder和SerializedValuesHolder。
DeserializedValuesHolder
DeserializedValuesHolder物件內部有兩個成員:vector,是一個SizeTrackingVector;arrayValues,是一個存放值的陣列,用於在所有資料插入後,將主句轉移到一個數組中,方便包裝成一個MemoryEntry物件。大部分工作是有SizeTrackingVector完成的。
private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
var arrayValues: Array[T] = null
override def storeValue(value: T): Unit = {
vector += value
}
override def estimatedSize(): Long = {
vector.estimateSize()
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
arrayValues = vector.toArray
vector = null
override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
override def build(): MemoryEntry[T] =
DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
}
}
SizeTracker
上面提到的SizeTrackingVector繼承了這個特質,除了這個特質,還集成了PrimitiveVector類,但是PrimitiveVector類基本上就是對一個數組的簡單包裝。
SizeTrackingVector最重要的功能:追蹤物件的大小,就是在SizeTracker特之中實現的。
我大致說一下這個特質是如何實現物件大小跟蹤和估算的,程式碼實現也並不複雜,感興趣的可以看一看,限於篇幅這裡就不貼了。
- 每插入一定數量的資料(姑且稱之為週期),就會對當前的物件進行一次取樣,而這個取樣的週期會越來越長,以1.1倍的速率增長;
- 取樣就是計算物件大小,並與前一次取樣作比較,而且只會保留最近兩次的取樣資料;
- 每次取樣其實就是獲取兩個資料,當前物件大小,當前插入的資料條數;
- 這樣與上一次取樣一比較,就能夠計算出每條資料的大小了;
- 最後,在返回整個物件大小時,是拿最近一次取樣時記錄下的物件大小,以及根據最近的情況估算的每條資料的大小乘以自從上次取樣以來新插入的資料量,二者相加作為物件大小的估算值,
可見這麼做並不是什麼精確,但是由於是抽樣,而且抽樣週期越往後面越長,所以對於資料插入的效率影響很小,而且這種不精確性其實在後續的記憶體檢查過程中是有考慮到的。在所有資料插入完的收尾工作中,會對物件大小做一次精確計算。此外,熟悉spark記憶體管理的同學應該知道,其實spark一般會配置一個安全因子(一般是0.9),也就是說只是用配置的記憶體大小的90%,就是為了儘可能地減少這種不精確的記憶體估算造成OOM的可能性。
SerializedValuesHolder
private class SerializedValuesHolder[T](
blockId: BlockId,
chunkSize: Int,
classTag: ClassTag[T],
memoryMode: MemoryMode,
serializerManager: SerializerManager) extends ValuesHolder[T] {
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
// 呼叫unsafe的本地方法申請直接記憶體
// 這個方法之所以沒有呼叫ByteBuffer.allocateDirect方法
// 是因為這個方法分配的直接記憶體大小收到引數MaxDirectMemorySize限制
// 所以這裡繞過ByteBuffer.allocateDirect方法,通過反射和unsafe類建立直接記憶體物件
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
val redirectableStream = new RedirectableOutputStream
val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
// 包裝壓縮流和序列化流
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
// 寫入方法,寫入的物件經過序列化,壓縮,
// 然後經過ChunkedByteBufferOutputStream被分割成一個個的位元組陣列塊
override def storeValue(value: T): Unit = {
serializationStream.writeObject(value)(classTag)
}
override def estimatedSize(): Long = {
bbos.size
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
serializationStream.close()
override def preciseSize(): Long = bbos.size
override def build(): MemoryEntry[T] =
SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
}
}
大概看一下,主要的邏輯很簡單,這裡面有幾個注意點:
- 對於直接記憶體分配,spark並沒有使用jdk的高階api,而是反射配合unsafe類分配直接記憶體,這樣可以繞過jvm引數MaxDirectMemorySize的限制,這也體現了spark的作者儘可能的降低使用者使用難度
- 另外,我們看到序列化流其實經過了層層包裝(典型的裝飾器模式),序列化和壓縮以及分塊是比較重要的幾個點,感興趣的話可以深究,序列化和壓縮如果深入瞭解都是很大的課題,所以這裡也僅僅是蜻蜓點水,不深究了。
總結
MemoryStore.scala這個檔案中乍看程式碼有八百多行,但是其實很大部分程式碼是一些輔助類,比較核心的寫入邏輯也就是前面提到的幾個方法,再加上核心的兩個類DeserializedValuesHolder和SerializedValuesHolder實現了以物件或位元組陣列的形式儲存資料