Spark SQL(5-2) CacheManage之InMemoryRelation
阿新 • • 發佈:2020-07-30
Spark SQL(5-2) CacheManage之InMemoryRelation
本來計劃中是沒有這節的,但是中午在看spark sql 記憶體管理模組的時候,腦子裡面突然問到,spark sql 快取到記憶體的資料是怎麼組織的;上網查了下部落格;然後自己也跟了下程式碼,就形成了這篇總結。
接著之前提到的CacheManage中有個CacheQuery的方法:
def cacheQuery( query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { val planToCache = query.logicalPlan if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { val sparkSession = query.sparkSession val inMemoryRelation = InMemoryRelation( sparkSession.sessionState.conf.useCompression, sparkSession.sessionState.conf.columnBatchSize, storageLevel, sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan, tableName, planToCache.stats) cachedData.add(CachedData(planToCache, inMemoryRelation)) } }
這個方法裡面可以看到快取的資料是以CacheData的形式組織的:
/** Holds a cached logical plan and its data */ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
閱讀他的註釋可以瞭解到,儲存邏輯計劃和他的資料,那麼資料應該就在InMemoryRelation裡面儲存了;那麼再看InMemoryRelation的apply方法:
object InMemoryRelation { def apply( useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, child: SparkPlan, tableName: Option[String], statsOfPlanToCache: Statistics): InMemoryRelation = new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)( statsOfPlanToCache = statsOfPlanToCache) }
在這裡主要的方法是:
private def buildBuffers(): Unit = { val output = child.output val cached = child.execute().mapPartitionsInternal { rowIterator => new Iterator[CachedBatch] { def next(): CachedBatch = { val columnBuilders = output.map { attribute => ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) }.toArray var rowCount = 0 var totalSize = 0L while (rowIterator.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { val row = rowIterator.next() // Added for SPARK-6082. This assertion can be useful for scenarios when something // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat // hard to decipher. assert( row.numFields == columnBuilders.length, s"Row column number mismatch, expected ${output.size} columns, " + s"but got ${row.numFields}." + s"\nRow content: $row") var i = 0 totalSize = 0 while (i < row.numFields) { columnBuilders(i).appendFrom(row, i) totalSize += columnBuilders(i).columnStats.sizeInBytes i += 1 } rowCount += 1 } sizeInBytesStats.add(totalSize) val stats = InternalRow.fromSeq( columnBuilders.flatMap(_.columnStats.collectedStatistics)) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats) } def hasNext: Boolean = rowIterator.hasNext } }.persist(storageLevel) cached.setName( tableName.map(n => s"In-memory table $n") .getOrElse(StringUtils.abbreviate(child.toString, 1024))) _cachedColumnBuffers = cached }
這裡面可以看到這裡面組織了一個CachedBatch的迭代器,對於CachedBatch:
/** * CachedBatch is a cached batch of rows. * * @param numRows The total number of rows in this batch * @param buffers The buffers for serialized columns * @param stats The stat of columns */ case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)
這裡面主要的就是buffers了,他是在上面提到的buildBuffers方法裡面:
val stats = InternalRow.fromSeq( columnBuilders.flatMap(_.columnStats.collectedStatistics)) CachedBatch(rowCount, columnBuilders.map { builder => JavaUtils.bufferToArray(builder.build()) }, stats)
這段是呼叫了JavaUtils.bufferToArray(builder.build())方法返回了一個buye陣列,columnBuilders也是陣列的形式,所以就是之前提到的CachedBatch的buffers;其實這個二維數組裡面第一維儲存了ColumnBuilder陣列,這個陣列相當於是每一列的資訊;之後在每一列的資訊裡面儲存了JavaUtils.bufferToArray(builder.build())的資料(值),至此如果想拿某個列的資料,其實拿到陣列的第一維度的描述,如果匹配獲取,不匹配直接跳過;這樣好似記憶體的列儲存,這塊理解不知道對不對,我覺得大概率是對的。
關於整個記憶體儲存的邏輯總結如上,但是關於列儲存的描述不確定對不對,有懂的可以留言討論。