Enzyme SQL引擎的實現與優化
Enzyme是挖財資料團隊自研的SQL執行引擎,適用於小規模或者中型資料集的快速計算。基於Spark Catalyst實現,Enzyme SQL在查詢層面 和Spark SQL完全相容。至於Dataframe,在Enzyme中有對應的Protein。在API的層次上,Protein和Spark Dataframe幾乎完全一致。
應用
Enzyme SQL目前應用於信貸風控體系中的變數中心。變數,也就是指標或者特徵,是描述一個使用者的一個值。最初,變數的加工邏輯由負責風控的資料分析師提供,需要通過資料團隊的工程師用Java程式碼實現。這種方式比較原始,研發的鏈路和週期也相對冗長。故而,我們使用SQL作為一種加工變數的DSL,提供在離線和實時兩個平臺上的一致語義。
為什麼要使用SQL呢?首先,自研DSL需要做很多設計,包括易用性、實現層面的效能等等;其次,自研的DSL最終被接受被高效使用,不可避免會有一個相對較長的磨合週期;最後,SQL作為資料分析師的看家本領,沒有使用的障礙和語義上的歧義,其實現也已經有大量現有的程式碼可供參考。
Enzyme SQL引擎極致的效能表現和非常低的CPU佔用與記憶體消耗,有效地支撐了變數中心龐大的計算量(一個使用者就會觸發數以千計的變數計算)。
實踐
Enzyme設計之初就是以相容Spark SQL為目標的,故而在使用上,和Spark SQL的API大體是一致的。EnzymeSession即SparkSession,Protein即Dataframe。
我們從構建一個Protein資料集開始:
// a session for computing
val conf = new EnzymeConf
val session = new EnzymeSession(conf)
// construct a protein from rows and schemas
val schema = StructType(Seq(
StructField("x", LongType),
StructField("y", StringType),
StructField("z", DoubleType),
StructField("in", IntegerType )
))
val rows = Seq(Row(1L, "234L", 1.1, 12),
Row(2L, "23L", 23.4, 4245),
Row(2L, "65L", 5244.2, 234),
Row(null, "7L", 245.234, 5245),
Row(4L, "7L", 245.234, 5245))
val df = new Protein(session, rows, schema)
複製程式碼
這樣的一個數據集可以直接展示:
> df.show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
|null| 7L|245.234|5245|
| 4| 7L|245.234|5245|
+----+----+-------+----+
複製程式碼
如果要使用SQL,首先我們要把這個資料集和一個表名關聯起來:
> session.register(tableName = "a", df)
> session.sql("select * from a").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
|null| 7L|245.234|5245|
| 4| 7L|245.234|5245|
+----+----+-------+----+
複製程式碼
上面的程式碼中session.sql()
的結果還是一個Protein。除了使用SQL,我們還可以使用Protein裡面豐富的API:
> session.sql("select * from a order by x asc").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
|null| 7L|245.234|5245|
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
| 4| 7L|245.234|5245|
+----+----+-------+----+
> df.sort("x").show()
+----+----+-------+----+
| x| y| z| in|
+----+----+-------+----+
|null| 7L|245.234|5245|
| 1|234L| 1.1| 12|
| 2| 23L| 23.4|4245|
| 2| 65L| 5244.2| 234|
| 4| 7L|245.234|5245|
+----+----+-------+----+
複製程式碼
更多用法的細節可以檢視Spark SQL的文件,也可以檢視Enzyme的文件。
實現
Enzyme基於Spark Catalyst實現,而Catalyst對標的開源專案是Apache Calcite。Apache Phoenix和Apache Hive等眾多專案都在使用Calcite。因為我們的目標是相容Spark SQL,自然而然選擇了Catalyst,作為SQL的解析器、邏輯計劃的執行器和優化器。
Spark Catalyst概覽
- SQL Text
- (parse): Unresolved Logical Plan
- (analyze): Resolved Logical Plan
- (optimize): Optimized Logical Plan(s) ----- RBO
- (planning): Physical Planning ------ CBO
- (optimize): Optimized Logical Plan(s) ----- RBO
- (analyze): Resolved Logical Plan
- (parse): Unresolved Logical Plan
上面的層次結構簡明地概括了一個SQL從最原始的SQL文字,到最後執行的各個階段。其中加粗的部分是Enzyme中所實現的,未加粗的部分是Catalyst所提供的功能。
解析,就是用Antlr4將SQL文字變成一棵AST樹,這個AST樹經過轉換,變成了最原始的邏輯計劃。在這樣的邏輯計劃中,我們是不知道*
所表示的欄位究竟是哪些。
分析,就是結合Catalog中的元資料資訊,將原始的邏輯計劃中各個未確定的部分(比如*
)和元資料匹配確定下來。如果發現型別無法滿足或者所引用的欄位根本不存在,就直接丟擲AnalysisException。
優化,即通過邏輯計劃的等價變換,轉換得到最優的邏輯計劃。Catalyst中內建了一系列既有的優化規則,比如謂詞下推和列剪裁。我們也可以通過Catalyst提供的介面,將自己研發的優化規則加入其中。這裡的優化就是RBO,基於規則的優化。
最後是物理計劃的生成,一個優化過後的邏輯計劃其實可以生成多種等效的物理計劃,資料最終決定了其中一個物理計劃是最優的。在沒有時光機的當下,我們無法將所有物理計劃都執行一遍,再選擇最優的那個。所以通常的做法是,收集一些關於底層表的統計資訊,依據這些資訊,預判出執行效率最高的物理計劃。這就是所謂的CBO,基於代價的優化。
一個SQL的一生
SELECT *
FROM employee
INNER JOIN department
ON employee.DepartmentID = department.DepartmentID
複製程式碼
我們用上面這個SQL來詳細瞭解一下上述各個階段。
分析階段
Project [*]
+- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
:- 'UnresolvedRelation `employee`
+- 'UnresolvedRelation `department`
Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- SubqueryAlias employee
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- SubqueryAlias department
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
複製程式碼
我們看到*
已經被展開成了四個明確的欄位,而且每個欄位都有明確的ID標誌,從而可以明確判定這個欄位來自於哪一個表。當我們需要對Spark SQL做精確到欄位級別的許可權控制的時候,我們所需要的其實就是經過分析的邏輯計劃。
優化
Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- SubqueryAlias employee
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- SubqueryAlias department
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- Filter isnotnull(DepartmentID#7L)
: +- LocalRelation [LastName#6, DepartmentID#7L]
+- Filter isnotnull(DepartmentID#0L)
+- LocalRelation [DepartmentID#0L, DepartmentName#1]
複製程式碼
因為這是一個inner join,所以這裡的一個優化點其實是在做join之前,把join key為null的行過濾掉。
物理計劃的生成
我們模仿Spark SQL中SparkPlan的實現,提供了簡化的EnzymePlan:
abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
def iterator: Iterator[InternalRow]
override def output: Seq[Attribute]
...
}
trait LeafExecNode extends EnzymePlan {
override final def children: Seq[EnzymePlan] = Nil
}
trait UnaryExecNode extends EnzymePlan {
def child: EnzymePlan
override final def children: Seq[EnzymePlan] = child :: Nil
}
trait BinaryExecNode extends EnzymePlan {
def left: EnzymePlan
def right: EnzymePlan
override final def children: Seq[EnzymePlan] = Seq(left, right)
}
複製程式碼
在這個程式碼片段中,EnzymePlan是核心,其中output表示一個物理計劃的節點上結果集的元資料資訊,而iterator則是呼叫這個物理計劃節點的入口。我們看到有三類物理計劃:
- LeafExecNode: LocalTableScanExec, LazyLocalTableScanExec
- UnaryExecNode: ProjectExec, LimitExec, FilterExec
- BinaryExecNode: HashJoinExec, NestedLoopExec
Enzyme中的部分物理計劃實現分類之後,如上所示。物理計劃整體上是一棵樹,資料實際上是從葉節點(Leaf)開始,經過過濾或者轉換(Unary)或者合流(Binary),最終匯聚到根節點,得到計算結果。葉節點就是我們的資料來源。有兩個輸入源的是Union或者Join,而只有一個輸入源的就是Projection,Filter,Sort等運算元。
上一節中優化之後的邏輯計劃可以生成這樣的物理計劃:
HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
, BuildRight, Inner
:- FilterExec isnotnull(DepartmentID#11L)
: +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
employee, [email protected]
+- FilterExec isnotnull(DepartmentID#4L)
+- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
department, [email protected]
複製程式碼
計算通過在根節點呼叫iterator方法,層層回溯:
HashJoinExec.iterator
+ FilterExec.iterator
+ LazyLocalTableScan(employee).iterator
+ FilterExec.iterator
+ LazyLocalTableScan(department).iterator
複製程式碼
效能調優
首先,我們需要定位效能瓶頸。JVM生態中有很多做Profiling的工具。Enzyme在優化過程中,使用的是JDK中自帶的jmc命令和FlightRecord。通過jmc的分析,可以定位到熱點的方法,耗時的方法等有幫助的資訊。我們有兩種優化的策略。
- 其一,直接替換掉慢的部分
- 其二,對無法優化的部分做必要的快取
- 其三,邏輯計劃優化
優化點一:動態程式碼生成調優
Spark的鎢絲計劃引入了動態程式碼生成的技術,比較有效地解決了三方面的問題(詳見參考資料2):
- 大量虛擬函式呼叫,生成的實際程式碼不再需要執行表示式系統中統一定義的虛擬函式
- 判斷資料型別和操作運算元等內容的大型分支選擇語句
- 常數傳播限制,生成的程式碼能夠確定性地摺疊常量
對於Enzyme的使用場景,動態程式碼生成並不一定有效能優化的效果,我們使用JMH做基準測試,將一部分使得效能變差的程式碼生成關閉掉。
數以千計的SQL會生成大量Java類,在引擎中編譯並快取,會帶來一些記憶體上的佔用和CPU的消耗,也是我們做取捨的其中一個原因。
優化點二:快取
我們做的最主要的快取就是從Unresolve Logical Plan到Physical Plan的生成。為什麼不是直接從SQL到Physical Plan呢?因為SQL解析的開銷實際上很小,而且略有差異的SQL所生成的Unresolved Logical Plan可能是一模一樣的。
在物理計劃的快取中,還有兩點需要注意:
- 其一,物理計劃必須和資料隔離
- 其二,物理計劃的計算不能有副作用
只有這樣,我們的快取才是有效的、正確無誤的。另外,在表的schema發生改變的時候,我們還需要讓所快取的相關物理計劃失效。
優化點三:新增邏輯計劃優化規則
Catalyst中的優化器提供了可擴充套件的介面,使得我們可以自定義邏輯計劃優化的規則。Databricks在Spark Summit上做過一個題為A Deep Dive into Spark SQL's Catalyst Optimizer的講座,其中有細節的介紹。
具體的介面如下:
spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil
複製程式碼
我們利用這個介面,針對我們的業務資料,專門定製了一系列額外的優化規則,極大地提升了引擎的效能。
Enzyme的未來
- 開源
- 做更多針對小資料集的優化,進一步改善效能
- 基於Enzyme,做一些上層生態的擴充套件
對於第三點,我們想做的實際上是讓Enzyme和其他生態更好地結合。比如如何將Enzyme運用到Spark Streaming或者Flink Streaming中,如何在Spring Boot中更加方便地使用Enzyme,如何在機器學習中使用Enzyme。
參考資料
- Spear: A playground for experimenting ideas that may apply to Spark SQL/Catalyst
- Scala Benchmark Starter
- 《Spark SQL核心剖析》
- 《高效能Scala》
作者簡介
忍冬,挖財資料研發工程師,負責Spark SQL在挖財的落地,自研了相容Spark SQL適用於單機小資料集的Enzyme SQL引擎。譯有《Scala實用指南》,業餘時間是GNU TeXmacs專案的維護者之一。