1. 程式人生 > >Enzyme SQL引擎的實現與優化

Enzyme SQL引擎的實現與優化

Enzyme是挖財資料團隊自研的SQL執行引擎,適用於小規模或者中型資料集的快速計算。基於Spark Catalyst實現,Enzyme SQL在查詢層面 和Spark SQL完全相容。至於Dataframe,在Enzyme中有對應的Protein。在API的層次上,Protein和Spark Dataframe幾乎完全一致。

應用

Enzyme SQL目前應用於信貸風控體系中的變數中心。變數,也就是指標或者特徵,是描述一個使用者的一個值。最初,變數的加工邏輯由負責風控的資料分析師提供,需要通過資料團隊的工程師用Java程式碼實現。這種方式比較原始,研發的鏈路和週期也相對冗長。故而,我們使用SQL作為一種加工變數的DSL,提供在離線和實時兩個平臺上的一致語義。

為什麼要使用SQL呢?首先,自研DSL需要做很多設計,包括易用性、實現層面的效能等等;其次,自研的DSL最終被接受被高效使用,不可避免會有一個相對較長的磨合週期;最後,SQL作為資料分析師的看家本領,沒有使用的障礙和語義上的歧義,其實現也已經有大量現有的程式碼可供參考。

Enzyme SQL引擎極致的效能表現和非常低的CPU佔用與記憶體消耗,有效地支撐了變數中心龐大的計算量(一個使用者就會觸發數以千計的變數計算)。

實踐

Enzyme設計之初就是以相容Spark SQL為目標的,故而在使用上,和Spark SQL的API大體是一致的。EnzymeSession即SparkSession,Protein即Dataframe。

我們從構建一個Protein資料集開始:

// a session for computing
val conf = new EnzymeConf
val session = new EnzymeSession(conf)

// construct a protein from rows and schemas
val schema = StructType(Seq(
  StructField("x", LongType),
  StructField("y", StringType),
  StructField("z", DoubleType),
  StructField("in", IntegerType
) )) val rows = Seq(Row(1L, "234L", 1.1, 12), Row(2L, "23L", 23.4, 4245), Row(2L, "65L", 5244.2, 234), Row(null, "7L", 245.234, 5245), Row(4L, "7L", 245.234, 5245)) val df = new Protein(session, rows, schema) 複製程式碼

這樣的一個數據集可以直接展示:

> df.show()

+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+
複製程式碼

如果要使用SQL,首先我們要把這個資料集和一個表名關聯起來:

> session.register(tableName = "a", df)
> session.sql("select * from a").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+
複製程式碼

上面的程式碼中session.sql()的結果還是一個Protein。除了使用SQL,我們還可以使用Protein裡面豐富的API:

> session.sql("select * from a order by x asc").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+

> df.sort("x").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+
複製程式碼

更多用法的細節可以檢視Spark SQL的文件,也可以檢視Enzyme的文件。

實現

Enzyme基於Spark Catalyst實現,而Catalyst對標的開源專案是Apache Calcite。Apache Phoenix和Apache Hive等眾多專案都在使用Calcite。因為我們的目標是相容Spark SQL,自然而然選擇了Catalyst,作為SQL的解析器、邏輯計劃的執行器和優化器。

Spark Catalyst概覽

  • SQL Text
    • (parse): Unresolved Logical Plan
      • (analyze): Resolved Logical Plan
        • (optimize): Optimized Logical Plan(s) ----- RBO
          • (planning): Physical Planning ------ CBO

上面的層次結構簡明地概括了一個SQL從最原始的SQL文字,到最後執行的各個階段。其中加粗的部分是Enzyme中所實現的,未加粗的部分是Catalyst所提供的功能。

解析,就是用Antlr4將SQL文字變成一棵AST樹,這個AST樹經過轉換,變成了最原始的邏輯計劃。在這樣的邏輯計劃中,我們是不知道*所表示的欄位究竟是哪些。

分析,就是結合Catalog中的元資料資訊,將原始的邏輯計劃中各個未確定的部分(比如*)和元資料匹配確定下來。如果發現型別無法滿足或者所引用的欄位根本不存在,就直接丟擲AnalysisException。

優化,即通過邏輯計劃的等價變換,轉換得到最優的邏輯計劃。Catalyst中內建了一系列既有的優化規則,比如謂詞下推和列剪裁。我們也可以通過Catalyst提供的介面,將自己研發的優化規則加入其中。這裡的優化就是RBO,基於規則的優化。

最後是物理計劃的生成,一個優化過後的邏輯計劃其實可以生成多種等效的物理計劃,資料最終決定了其中一個物理計劃是最優的。在沒有時光機的當下,我們無法將所有物理計劃都執行一遍,再選擇最優的那個。所以通常的做法是,收集一些關於底層表的統計資訊,依據這些資訊,預判出執行效率最高的物理計劃。這就是所謂的CBO,基於代價的優化。

一個SQL的一生

SELECT *
FROM employee
INNER JOIN department
ON employee.DepartmentID = department.DepartmentID
複製程式碼

我們用上面這個SQL來詳細瞭解一下上述各個階段。

分析階段

Project [*]
+- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
   :- 'UnresolvedRelation `employee`
   +- 'UnresolvedRelation `department`


Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]
複製程式碼

我們看到*已經被展開成了四個明確的欄位,而且每個欄位都有明確的ID標誌,從而可以明確判定這個欄位來自於哪一個表。當我們需要對Spark SQL做精確到欄位級別的許可權控制的時候,我們所需要的其實就是經過分析的邏輯計劃。

優化

Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]

Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- Filter isnotnull(DepartmentID#7L)
:  +- LocalRelation [LastName#6, DepartmentID#7L]
+- Filter isnotnull(DepartmentID#0L)
   +- LocalRelation [DepartmentID#0L, DepartmentName#1]
複製程式碼

因為這是一個inner join,所以這裡的一個優化點其實是在做join之前,把join key為null的行過濾掉。

物理計劃的生成

我們模仿Spark SQL中SparkPlan的實現,提供了簡化的EnzymePlan:

abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
  def iterator: Iterator[InternalRow]
  override def output: Seq[Attribute]
  ...
}

trait LeafExecNode extends EnzymePlan {
  override final def children: Seq[EnzymePlan] = Nil
}

trait UnaryExecNode extends EnzymePlan {
  def child: EnzymePlan

  override final def children: Seq[EnzymePlan] = child :: Nil
}

trait BinaryExecNode extends EnzymePlan {
  def left: EnzymePlan

  def right: EnzymePlan

  override final def children: Seq[EnzymePlan] = Seq(left, right)
}
複製程式碼

在這個程式碼片段中,EnzymePlan是核心,其中output表示一個物理計劃的節點上結果集的元資料資訊,而iterator則是呼叫這個物理計劃節點的入口。我們看到有三類物理計劃:

  • LeafExecNode: LocalTableScanExec, LazyLocalTableScanExec
  • UnaryExecNode: ProjectExec, LimitExec, FilterExec
  • BinaryExecNode: HashJoinExec, NestedLoopExec

Enzyme中的部分物理計劃實現分類之後,如上所示。物理計劃整體上是一棵樹,資料實際上是從葉節點(Leaf)開始,經過過濾或者轉換(Unary)或者合流(Binary),最終匯聚到根節點,得到計算結果。葉節點就是我們的資料來源。有兩個輸入源的是Union或者Join,而只有一個輸入源的就是Projection,Filter,Sort等運算元。

上一節中優化之後的邏輯計劃可以生成這樣的物理計劃:

HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
         , BuildRight, Inner
:- FilterExec isnotnull(DepartmentID#11L)
:  +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
                         employee, [email protected]
 +- FilterExec isnotnull(DepartmentID#4L)
   +- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
                         department, [email protected]
複製程式碼

計算通過在根節點呼叫iterator方法,層層回溯:

HashJoinExec.iterator
 + FilterExec.iterator
   + LazyLocalTableScan(employee).iterator
 + FilterExec.iterator
   + LazyLocalTableScan(department).iterator
複製程式碼

效能調優

首先,我們需要定位效能瓶頸。JVM生態中有很多做Profiling的工具。Enzyme在優化過程中,使用的是JDK中自帶的jmc命令和FlightRecord。通過jmc的分析,可以定位到熱點的方法,耗時的方法等有幫助的資訊。我們有兩種優化的策略。

  • 其一,直接替換掉慢的部分
  • 其二,對無法優化的部分做必要的快取
  • 其三,邏輯計劃優化

優化點一:動態程式碼生成調優

Spark的鎢絲計劃引入了動態程式碼生成的技術,比較有效地解決了三方面的問題(詳見參考資料2):

  • 大量虛擬函式呼叫,生成的實際程式碼不再需要執行表示式系統中統一定義的虛擬函式
  • 判斷資料型別和操作運算元等內容的大型分支選擇語句
  • 常數傳播限制,生成的程式碼能夠確定性地摺疊常量

對於Enzyme的使用場景,動態程式碼生成並不一定有效能優化的效果,我們使用JMH做基準測試,將一部分使得效能變差的程式碼生成關閉掉。

數以千計的SQL會生成大量Java類,在引擎中編譯並快取,會帶來一些記憶體上的佔用和CPU的消耗,也是我們做取捨的其中一個原因。

優化點二:快取

我們做的最主要的快取就是從Unresolve Logical Plan到Physical Plan的生成。為什麼不是直接從SQL到Physical Plan呢?因為SQL解析的開銷實際上很小,而且略有差異的SQL所生成的Unresolved Logical Plan可能是一模一樣的。

在物理計劃的快取中,還有兩點需要注意:

  • 其一,物理計劃必須和資料隔離
  • 其二,物理計劃的計算不能有副作用

只有這樣,我們的快取才是有效的、正確無誤的。另外,在表的schema發生改變的時候,我們還需要讓所快取的相關物理計劃失效。

優化點三:新增邏輯計劃優化規則

Catalyst中的優化器提供了可擴充套件的介面,使得我們可以自定義邏輯計劃優化的規則。Databricks在Spark Summit上做過一個題為A Deep Dive into Spark SQL's Catalyst Optimizer的講座,其中有細節的介紹。

具體的介面如下:

spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil
複製程式碼

我們利用這個介面,針對我們的業務資料,專門定製了一系列額外的優化規則,極大地提升了引擎的效能。

Enzyme的未來

  1. 開源
  2. 做更多針對小資料集的優化,進一步改善效能
  3. 基於Enzyme,做一些上層生態的擴充套件

對於第三點,我們想做的實際上是讓Enzyme和其他生態更好地結合。比如如何將Enzyme運用到Spark Streaming或者Flink Streaming中,如何在Spring Boot中更加方便地使用Enzyme,如何在機器學習中使用Enzyme。

參考資料

  1. Spear: A playground for experimenting ideas that may apply to Spark SQL/Catalyst
  2. Scala Benchmark Starter
  3. 《Spark SQL核心剖析》
  4. 《高效能Scala》

作者簡介

忍冬,挖財資料研發工程師,負責Spark SQL在挖財的落地,自研了相容Spark SQL適用於單機小資料集的Enzyme SQL引擎。譯有《Scala實用指南》,業餘時間是GNU TeXmacs專案的維護者之一。