1. 程式人生 > 實用技巧 >Spark SQL(5-2) CacheManage之InMemoryRelation

Spark SQL(5-2) CacheManage之InMemoryRelation

Spark SQL(5-2) CacheManage之InMemoryRelation

本來計劃中是沒有這節的,但是中午在看spark sql 記憶體管理模組的時候,腦子裡面突然問到,spark sql 快取到記憶體的資料是怎麼組織的;上網查了下部落格;然後自己也跟了下程式碼,就形成了這篇總結。

接著之前提到的CacheManage中有個CacheQuery的方法:

def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      val inMemoryRelation = InMemoryRelation(
        sparkSession.sessionState.conf.useCompression,
        sparkSession.sessionState.conf.columnBatchSize, storageLevel,
        sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan,
        tableName,
        planToCache.stats)
      cachedData.add(CachedData(planToCache, inMemoryRelation))
    }
  }

  這個方法裡面可以看到快取的資料是以CacheData的形式組織的:

/** Holds a cached logical plan and its data */
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)

  閱讀他的註釋可以瞭解到,儲存邏輯計劃和他的資料,那麼資料應該就在InMemoryRelation裡面儲存了;那麼再看InMemoryRelation的apply方法:

object InMemoryRelation {
  def apply(
      useCompression: Boolean,
      batchSize: Int,
      storageLevel: StorageLevel,
      child: SparkPlan,
      tableName: Option[String],
      statsOfPlanToCache: Statistics): InMemoryRelation =
    new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, child, tableName)(
      statsOfPlanToCache = statsOfPlanToCache)
}

  在這裡主要的方法是:

private def buildBuffers(): Unit = {
    val output = child.output
    val cached = child.execute().mapPartitionsInternal { rowIterator =>
      new Iterator[CachedBatch] {
        def next(): CachedBatch = {
          val columnBuilders = output.map { attribute =>
            ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
          }.toArray

          var rowCount = 0
          var totalSize = 0L
          while (rowIterator.hasNext && rowCount < batchSize
            && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
            val row = rowIterator.next()

            // Added for SPARK-6082. This assertion can be useful for scenarios when something
            // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
            // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
            // hard to decipher.
            assert(
              row.numFields == columnBuilders.length,
              s"Row column number mismatch, expected ${output.size} columns, " +
                s"but got ${row.numFields}." +
                s"\nRow content: $row")

            var i = 0
            totalSize = 0
            while (i < row.numFields) {
              columnBuilders(i).appendFrom(row, i)
              totalSize += columnBuilders(i).columnStats.sizeInBytes
              i += 1
            }
            rowCount += 1
          }

          sizeInBytesStats.add(totalSize)

          val stats = InternalRow.fromSeq(
            columnBuilders.flatMap(_.columnStats.collectedStatistics))
          CachedBatch(rowCount, columnBuilders.map { builder =>
            JavaUtils.bufferToArray(builder.build())
          }, stats)
        }

        def hasNext: Boolean = rowIterator.hasNext
      }
    }.persist(storageLevel)

    cached.setName(
      tableName.map(n => s"In-memory table $n")
        .getOrElse(StringUtils.abbreviate(child.toString, 1024)))
    _cachedColumnBuffers = cached
  }

  這裡面可以看到這裡面組織了一個CachedBatch的迭代器,對於CachedBatch:

/**
 * CachedBatch is a cached batch of rows.
 *
 * @param numRows The total number of rows in this batch
 * @param buffers The buffers for serialized columns
 * @param stats The stat of columns
 */
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)

  這裡面主要的就是buffers了,他是在上面提到的buildBuffers方法裡面:

 val stats = InternalRow.fromSeq(
            columnBuilders.flatMap(_.columnStats.collectedStatistics))
          CachedBatch(rowCount, columnBuilders.map { builder =>
            JavaUtils.bufferToArray(builder.build())
          }, stats)

  這段是呼叫了JavaUtils.bufferToArray(builder.build())方法返回了一個buye陣列,columnBuilders也是陣列的形式,所以就是之前提到的CachedBatch的buffers;其實這個二維數組裡面第一維儲存了ColumnBuilder陣列,這個陣列相當於是每一列的資訊;之後在每一列的資訊裡面儲存了JavaUtils.bufferToArray(builder.build())的資料(值),至此如果想拿某個列的資料,其實拿到陣列的第一維度的描述,如果匹配獲取,不匹配直接跳過;這樣好似記憶體的列儲存,這塊理解不知道對不對,我覺得大概率是對的。

關於整個記憶體儲存的邏輯總結如上,但是關於列儲存的描述不確定對不對,有懂的可以留言討論。