1. 程式人生 > >給 Spark2.2 加上自定義 Hint 優化

給 Spark2.2 加上自定義 Hint 優化

由於我們底層資料存的型別是String,所以在做比較或排序時,資料計算錯誤。比如 “9” 比 “10” 大。為了只對比較的欄位 cast 成 Double,老大突發奇想:在 Hint 傳入需要轉換欄位名,然後 Analyzer 做計算的判斷,將指定資料 cast 成 Double。

在Spark2.2中,增加了對Hint的解析,支援使用者broadcast hint。本文的目的是在 Analyzer 裡增加一個 function,支援將指定的 UnresolvedAttribute 加上 Cast。

首先,Analyzer.scala 中和 Hint 相關的程式碼

lazy val batches: Seq[Batch] = Seq(
  Batch("Hints", fixedPoint,
    new ResolveHints.ResolveBroadcastHints(conf),
    ResolveHints.RemoveAllHints),
    ……
  )

這裡有一個 Object 叫 ResolveHints,和 Hint 處理相關的都在這裡面。ResolveHints 裡有一個 class 叫ResolveBroadcastHints,用來處理和 Broadcast 相關的 Hint;有一個 object 叫 RemoveAllHints 只有一個功能:將 UnresolvedHint 節點從解析樹上刪掉

case h: UnresolvedHint => h.child

在 ResolveBroadcastHints 裡,是解析一棵 LogicalPlan 樹,這裡 transformUp 接受的是偏函式物件的引數(什麼叫偏函式?請自行百度),自頂向下遞迴判斷當前節點是否為 UnresolvedHint,再在applyBroadcastHint 中遞迴標記可以廣播的資訊。

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
  case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
    if
(h.parameters.isEmpty) { // If there is no table alias specified, turn the entire subtree into a BroadcastHint. ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true))) } else { // Otherwise, find within the subtree query plans that should be broadcasted. applyBroadcastHint(h.child, h.parameters.map { case tableName: String => tableName case tableId: UnresolvedAttribute => tableId.name case unsupported => throw new AnalysisException("Broadcast hint parameter should be " + s"an identifier or string but was $unsupported (${unsupported.getClass}") }.toSet) } }

在 UnresolvedHint 中,有一個 name 屬性,標記這個函式名;有一個 parameters 屬性,標記這個函式接受的引數。

OK,以上關於已有程式碼的一些理解。也許理解上仍有偏差,同時也不夠接地氣,接下去接地氣一些。既然是要修改 Spark 原始碼,測試當然少不了。關於 Spark 的單元測試,功能還是相當豐富的。本文由於是個人學習的一些記錄,所以我會記一些單元測試中遇到的坑,以及一些技巧,也許對於讀者無用,請讀者跳過。

關於單元測試,首先當然是記錄老大牛逼閃閃的blog

我們應該先關注一下當前 Parser 結束之後的 plan,在 catalyst 裡,有一個 parsePlan 方法,可以實現這個目的。(當然,這是在一些 parser 的測試裡看到的)

explain extended select /*+ CAST_AS_DOUBLE(`key`) */ if(`key` > `value`, `key`, `value`) from src order by `key`

得到這樣的結果

== Parsed Logical Plan ==
'Sort ['key ASC NULLS FIRST], true
+- 'UnresolvedHint CAST_AS_DOUBLE, ['key]
   +- 'Project [unresolvedalias('if(('key > 'value), 'key, 'value), None)]
      +- 'UnresolvedRelation `src`

可以發現 UnresolvedHint 確實是像 Spark 原始碼註釋裡說的那樣

A broadcast hint plan node will be inserted on top of any relation (that is not aliased differently), subquery, or common table expression that match the specified name.

但是我希望它是 Sort 的父節點,這樣就可以和 applyBroadcastHint 方法走類似的邏輯;很遺憾,只能重新遍歷一次樹了。解決方案是,記錄下所有需要做 Cast 操作的 UnresolvedAttribute,然後在 RemoveAllHints 的時候做一次節點替換。

這邊再記錄踩的一個坑吧。在做節點替換的時候,我一開始是這樣寫的:

case a => a transformExpressions {
  case sortOrder: SortOrder => sortOrder transformUp {
    case child: UnresolvedAttribute => Cast(child, DoubleType)
  }
}

這裡面會出現一個死迴圈,或者說無限遞迴呼叫,因為原來的 UnresolvedAttribute 是 Cast 的子節點,然後遍歷子節點再次遍歷到 UnresolvedAttribute 時,又會再加一層 Cast,無限迴圈到堆疊溢位。

最後是這樣寫的:

case a => a transformExpressions {
  case SortOrder(child, direction, nullOrdering, sameOrderExpressions)
    if child.isInstanceOf[UnresolvedAttribute] &&
      CAST_FIELD_ID.contains(child.asInstanceOf[UnresolvedAttribute].name) =>
    SortOrder(Cast(child, DoubleType), direction, nullOrdering, sameOrderExpressions)

對於希望用SQL去做單元測試,在 catalyst 裡,建立一張表會比較麻煩(可以執行SQL,並且使用斷點除錯達到目的,但是最終是會報錯,終究還是很不爽的),建議在 sql core 裡寫單元測試。

路漫漫其修遠兮,吾將上下而求索。加油加油!