[原始碼分析] 帶你梳理 Flink SQL / Table API內部執行流程
阿新 • • 發佈:2020-04-25
# [原始碼分析] 帶你梳理 Flink SQL / Table API內部執行流程
[Toc]
## 0x00 摘要
本文將簡述Flink SQL / Table API的內部實現,為大家把 "從SQL語句到具體執行" 這個流程串起來。並且儘量多提供呼叫棧,這樣大家在遇到問題時就知道應該從什麼地方設定斷點,對整體架構理解也能更加深入。
SQL流程中涉及到幾個重要的節點舉例如下:
```java
// NOTE : 執行順序是從上至下, " -----> " 表示生成的例項型別
*
* +-----> "left outer JOIN" (SQL statement)
* |
* |
* SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode
* |
* |
* +-----> SqlJoin (SqlNode)
* |
* |
* SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,作用是SqlNode–>RelNode
* |
* |
* +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode
* |
* |
* FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules
* VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan
* |
* |
* +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,邏輯執行計劃
* |
* |
* StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin
* VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
* |
* |
* +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
* |
* |
* StreamExecJoin.translateToPlanInternal // 作用是生成 StreamOperator, 即Flink運算元
* |
* |
* +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask
* |
* |
* StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask呼叫StreamingJoinOperator,真實的執行
* |
* |
```
後續我們會以這個圖為脈絡進行講解。
## 0x01 Apache Calcite
Flink Table API&SQL 為流式資料和靜態資料的關係查詢保留統一的介面,而且利用了Apache Calcite的查詢優化框架和SQL parser。
為什麼Flink要使用Table API呢?總結來說,關係型API的好處如下:
- 關係型API是宣告式的
- 查詢能夠被有效的優化
- 查詢可以高效的執行
- “Everybody” knows SQL
Calcite是這裡面的核心成員。Apache Calcite是面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和連線各種資料來源的能力。
### 1. Calcite 概念
下面是 Calcite 概念梳理:
- **關係代數(Relational algebra)**:即關係表示式。它們通常以動詞命名,例如 Sort, Join, Project, Filter, Scan, Sample.
- **表示式有各種特徵(Trait)**:使用 Trait 的 satisfies() 方法來測試某個表示式是否符合某 Trait 或 Convention.
- **規則(Rules)**:用於將一個表示式轉換(Transform)為另一個表示式。它有一個由 RelOptRuleOperand 組成的列表來決定是否可將規則應用於樹的某部分。
- **規劃器(Planner)** :即請求優化器,它可以根據一系列規則和成本模型(例如基於成本的優化模型 VolcanoPlanner、啟發式優化模型 HepPlanner)來將一個表示式轉為語義等價(但效率更優)的另一個表示式。
- **RelNode** :代表了對資料的一個處理操作,常見的操作有 Sort、Join、Project、Filter、Scan 等。它蘊含的是對整個 Relation 的操作,而不是對具體資料的處理邏輯。RelNode 會標識其 input RelNode 資訊,這樣就構成了一棵 RelNode 樹。
- **RexNode** : 行表示式(標量表達式),蘊含的是對一行資料的處理邏輯。每個行表示式都有資料的型別。這是因為在 Valdiation 的過程中,編譯器會推匯出表示式的結果型別。常見的行表示式包括字面量 RexLiteral, 變數 RexVariable,函式或操作符呼叫 RexCall 等。RexNode 通過 RexBuilder 進行構建。
- **RelTrait** : 用來定義邏輯表的物理相關屬性(physical property),三種主要的 trait 型別是:Convention、RelCollation、RelDistribution;
### 2. Calcite 處理流程
Sql 的執行過程一般可以分為四個階段,Calcite 與這個很類似,但Calcite是分成五個階段 :
1. SQL 解析階段,生成AST(抽象語法樹)(SQL–>SqlNode)
2. SqlNode 驗證(SqlNode–>SqlNode)
3. 語義分析,生成邏輯計劃(Logical Plan)(SqlNode–>RelNode/RexNode)
4. 優化階段,按照相應的規則(Rule)進行優化(RelNode–>RelNode)
5. 生成ExecutionPlan,生成物理執行計劃(DataStream Plan)
## 0x02 Flink SQL綜述
### 1. Flink關係型API執行原理
Flink承載了 Table API 和 SQL API 兩套表達方式。它以Apache Calcite這個SQL解析器做SQL語義解析,統一生成為 Calcite Logical Plan(SqlNode 樹);隨後驗證;再利用 Calcite的優化器優化轉換規則和logical plan,根據資料來源的性質(流和批)使用不同的規則進行優化,優化為 RelNode 邏輯執行計劃樹;最終優化後的plan轉成常規的Flink DataSet 或 DataStream 程式。任何對於DataStream API和DataSet API的效能調優提升都能夠自動地提升Table API或者SQL查詢的效率。
### 2. Flink Sql 執行流程
一條stream sql從提交到calcite解析、優化最後到Flink引擎執行,一般分為以下幾個階段:
1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
2. Sql Validator: 結合數字字典(catalog)去驗證sql語法;
3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示;
4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan, 再基於Flink定製的一些優化rules去優化logical Plan;
5. 生成Flink PhysicalPlan: 這裡也是基於Flink裡頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃;
6. 將物理執行計劃轉成Flink ExecutionPlan: 就是呼叫相應的tanslateToPlan方法轉換和利用CodeGen超程式設計成Flink的各種運算元。
### 3. Flink Table Api 執行流程
而如果是通過table api來提交任務的話,也會經過calcite優化等階段,基本流程和直接執行sql類似:
1. table api parser: Flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式;
在這棵樹上的每個節點的計算邏輯用Expression來表示。
2. Validate: 會結合數字字典(catalog)將樹的每個節點的Unresolved Expression進行繫結,生成Resolved Expression;
3. 生成Logical Plan: 依次遍歷數的每個節點,呼叫construct方法將原先用treeNode表達的節點轉成成用calcite 內部的資料結構relNode 來表達。即生成了LogicalPlan, 用relNode表示;
4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan,
再基於Flink定製的一些優化rules去優化logical Plan;
5. 生成Flink PhysicalPlan: 這裡也是基於Flink裡頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃;
6. 將物理執行計劃轉成Flink ExecutionPlan: 就是呼叫相應的tanslateToPlan方法轉換和利用CodeGen超程式設計成Flink的各種運算元。
### 4. Flink Table/SQL 執行流程的異同
可以看出來,Table API 與 SQL 在獲取 RelNode 之後是一樣的流程,只是獲取 RelNode 的方式有所區別:
- Table API :通過使用 RelBuilder來拿到RelNode(LogicalNode與Expression分別轉換成RelNode與RexNode);
- SQL :通過使用Planner。首先通過parse方法將使用者使用的SQL文字轉換成由SqlNode表示的parse tree。接著通過validate方法,使用元資訊來resolve欄位,確定型別,驗證有效性等等。最後通過rel方法將SqlNode轉換成RelNode;
## 0x03 Flink SQL 相關物件
### 1. TableEnvironment物件
TableEnvironment物件是Table API和SQL整合的一個核心,支援以下場景:
- 註冊一個Table。
- 將一個TableSource註冊給TableEnvironment,這裡的TableSource指的是將資料儲存系統的作為Table,例如mysql,hbase,CSV,Kakfa,RabbitMQ等等。
- 註冊一個外部的catalog,可以訪問外部系統的資料或檔案。
- 執行SQL查詢。
- 註冊一個使用者自定義的function。
- 將DataStream或DataSet轉成Table。
一個查詢中只能繫結一個指定的TableEnvironment,TableEnvironment可以通過來配置TableConfig來配置,通過TableConfig可以自定義查詢優化以及translation的程序。
TableEnvironment執行過程如下:
- TableEnvironment.sql()為呼叫入口;
- Flink實現了FlinkPlannerImpl,執行parse(sql),validate(sqlNode),rel(sqlNode)操作;
- 生成Table;
具體程式碼摘要如下
```java
package org.apache.Flink.table.api.internal;
@Internal
public class TableEnvironmentImpl implements TableEnvironment {
private final CatalogManager catalogManager;
private final ModuleManager moduleManager;
private final OperationTreeBuilder operationTreeBuilder;
private fi