1. 程式人生 > 實用技巧 >Spark SQL(5) CacheManage

Spark SQL(5) CacheManage

Spark SQL(5) CacheManage

在spark sql的analyzed plan 生成之後,會經過一步withCachedData的操作,其實就是根據對logicalplan的快取,如果logicalPlan的查詢結果相同則會替換相對應的節點。這步發生在QueryExecution.withCachedData:

lazy val withCachedData: LogicalPlan = {
    assertAnalyzed()
    assertSupported()
    sparkSession.sharedState.cacheManager.useCachedData(analyzed)
  }

/** Replaces segments of the given logical plan with cached versions where possible. */
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
// Do not lookup the cache by hint node. Hint node is special, we should ignore it when
// canonicalizing plans, so that plans which are same except hint can hit the same cache.
// However, we also want to keep the hint info after cache lookup. Here we skip the hint
// node, so that the returned caching plan won't replace the hint node and drop the hint info
// from the original plan.
case hint: ResolvedHint => hint

case currentFragment =>
lookupCachedData(currentFragment)
.map(_.cachedRepresentation.withOutput(currentFragment.output))
.getOrElse(currentFragment)
}

newPlan transformAllExpressions {
case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
}

  這裡面主要是CacheManager.lookupCachedData方法,

/** Optionally returns cached data for the given [[LogicalPlan]]. */
  def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
    cachedData.asScala.find(cd => plan.sameResult(cd.plan))
  }
@transient
private val cachedData = new java.util.LinkedList[CachedData]
/** Holds a cached logical plan and its data */
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)

  從上面可以看到CacheManager是通過一個連結串列儲存了LogicalPlan和InMemoryRelation(葉子節點),從而在執行的時候直接替換快取的結果。

此處有個問題,這個連結串列是什麼時候放進去的呢?其實需要呼叫dataset的persist方法即可

/**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      val inMemoryRelation = InMemoryRelation(
        sparkSession.sessionState.conf.useCompression,
        sparkSession.sessionState.conf.columnBatchSize, storageLevel,
        sparkSession.sessionState.executePlan(AnalysisBarrier(planToCache)).executedPlan,
        tableName,
        planToCache.stats)
      cachedData.add(CachedData(planToCache, inMemoryRelation))
    }
  }



def persist(): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}

這裡其實就是通過後序遍歷的方式,檢視快取在cacheData中的邏輯計劃,如果匹配就把整個節點替換。