1. 程式人生 > >[原始碼分析] 帶你梳理 Flink SQL / Table API內部執行流程

[原始碼分析] 帶你梳理 Flink SQL / Table API內部執行流程

# [原始碼分析] 帶你梳理 Flink SQL / Table API內部執行流程 [Toc] ## 0x00 摘要 本文將簡述Flink SQL / Table API的內部實現,為大家把 "從SQL語句到具體執行" 這個流程串起來。並且儘量多提供呼叫棧,這樣大家在遇到問題時就知道應該從什麼地方設定斷點,對整體架構理解也能更加深入。 SQL流程中涉及到幾個重要的節點舉例如下: ```java // NOTE : 執行順序是從上至下, " -----> " 表示生成的例項型別 * * +-----> "left outer JOIN" (SQL statement) * | * | * SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode * | * | * +-----> SqlJoin (SqlNode) * | * | * SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,作用是SqlNode–>RelNode * | * | * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode * | * | * FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules * VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan * | * | * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,邏輯執行計劃 * | * | * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin * VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃 * | * | * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃 * | * | * StreamExecJoin.translateToPlanInternal // 作用是生成 StreamOperator, 即Flink運算元 * | * | * +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask * | * | * StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask呼叫StreamingJoinOperator,真實的執行 * | * | ``` 後續我們會以這個圖為脈絡進行講解。 ## 0x01 Apache Calcite Flink Table API&SQL 為流式資料和靜態資料的關係查詢保留統一的介面,而且利用了Apache Calcite的查詢優化框架和SQL parser。 為什麼Flink要使用Table API呢?總結來說,關係型API的好處如下: - 關係型API是宣告式的 - 查詢能夠被有效的優化 - 查詢可以高效的執行 - “Everybody” knows SQL Calcite是這裡面的核心成員。Apache Calcite是面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和連線各種資料來源的能力。 ### 1. Calcite 概念 下面是 Calcite 概念梳理: - **關係代數(Relational algebra)**:即關係表示式。它們通常以動詞命名,例如 Sort, Join, Project, Filter, Scan, Sample. - **表示式有各種特徵(Trait)**:使用 Trait 的 satisfies() 方法來測試某個表示式是否符合某 Trait 或 Convention. - **規則(Rules)**:用於將一個表示式轉換(Transform)為另一個表示式。它有一個由 RelOptRuleOperand 組成的列表來決定是否可將規則應用於樹的某部分。 - **規劃器(Planner)** :即請求優化器,它可以根據一系列規則和成本模型(例如基於成本的優化模型 VolcanoPlanner、啟發式優化模型 HepPlanner)來將一個表示式轉為語義等價(但效率更優)的另一個表示式。 - **RelNode** :代表了對資料的一個處理操作,常見的操作有 Sort、Join、Project、Filter、Scan 等。它蘊含的是對整個 Relation 的操作,而不是對具體資料的處理邏輯。RelNode 會標識其 input RelNode 資訊,這樣就構成了一棵 RelNode 樹。 - **RexNode** : 行表示式(標量表達式),蘊含的是對一行資料的處理邏輯。每個行表示式都有資料的型別。這是因為在 Valdiation 的過程中,編譯器會推匯出表示式的結果型別。常見的行表示式包括字面量 RexLiteral, 變數 RexVariable,函式或操作符呼叫 RexCall 等。RexNode 通過 RexBuilder 進行構建。 - **RelTrait** : 用來定義邏輯表的物理相關屬性(physical property),三種主要的 trait 型別是:Convention、RelCollation、RelDistribution; ### 2. Calcite 處理流程 Sql 的執行過程一般可以分為四個階段,Calcite 與這個很類似,但Calcite是分成五個階段 : 1. SQL 解析階段,生成AST(抽象語法樹)(SQL–>SqlNode) 2. SqlNode 驗證(SqlNode–>SqlNode) 3. 語義分析,生成邏輯計劃(Logical Plan)(SqlNode–>RelNode/RexNode) 4. 優化階段,按照相應的規則(Rule)進行優化(RelNode–>RelNode) 5. 生成ExecutionPlan,生成物理執行計劃(DataStream Plan) ## 0x02 Flink SQL綜述 ### 1. Flink關係型API執行原理 Flink承載了 Table API 和 SQL API 兩套表達方式。它以Apache Calcite這個SQL解析器做SQL語義解析,統一生成為 Calcite Logical Plan(SqlNode 樹);隨後驗證;再利用 Calcite的優化器優化轉換規則和logical plan,根據資料來源的性質(流和批)使用不同的規則進行優化,優化為 RelNode 邏輯執行計劃樹;最終優化後的plan轉成常規的Flink DataSet 或 DataStream 程式。任何對於DataStream API和DataSet API的效能調優提升都能夠自動地提升Table API或者SQL查詢的效率。 ### 2. Flink Sql 執行流程 一條stream sql從提交到calcite解析、優化最後到Flink引擎執行,一般分為以下幾個階段: 1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST; 2. Sql Validator: 結合數字字典(catalog)去驗證sql語法; 3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示; 4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan, 再基於Flink定製的一些優化rules去優化logical Plan; 5. 生成Flink PhysicalPlan: 這裡也是基於Flink裡頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃; 6. 將物理執行計劃轉成Flink ExecutionPlan: 就是呼叫相應的tanslateToPlan方法轉換和利用CodeGen超程式設計成Flink的各種運算元。 ### 3. Flink Table Api 執行流程 而如果是通過table api來提交任務的話,也會經過calcite優化等階段,基本流程和直接執行sql類似: 1. table api parser: Flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式; 在這棵樹上的每個節點的計算邏輯用Expression來表示。 2. Validate: 會結合數字字典(catalog)將樹的每個節點的Unresolved Expression進行繫結,生成Resolved Expression; 3. 生成Logical Plan: 依次遍歷數的每個節點,呼叫construct方法將原先用treeNode表達的節點轉成成用calcite 內部的資料結構relNode 來表達。即生成了LogicalPlan, 用relNode表示; 4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan, 再基於Flink定製的一些優化rules去優化logical Plan; 5. 生成Flink PhysicalPlan: 這裡也是基於Flink裡頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃; 6. 將物理執行計劃轉成Flink ExecutionPlan: 就是呼叫相應的tanslateToPlan方法轉換和利用CodeGen超程式設計成Flink的各種運算元。 ### 4. Flink Table/SQL 執行流程的異同 可以看出來,Table API 與 SQL 在獲取 RelNode 之後是一樣的流程,只是獲取 RelNode 的方式有所區別: - Table API :通過使用 RelBuilder來拿到RelNode(LogicalNode與Expression分別轉換成RelNode與RexNode); - SQL :通過使用Planner。首先通過parse方法將使用者使用的SQL文字轉換成由SqlNode表示的parse tree。接著通過validate方法,使用元資訊來resolve欄位,確定型別,驗證有效性等等。最後通過rel方法將SqlNode轉換成RelNode; ## 0x03 Flink SQL 相關物件 ### 1. TableEnvironment物件 TableEnvironment物件是Table API和SQL整合的一個核心,支援以下場景: - 註冊一個Table。 - 將一個TableSource註冊給TableEnvironment,這裡的TableSource指的是將資料儲存系統的作為Table,例如mysql,hbase,CSV,Kakfa,RabbitMQ等等。 - 註冊一個外部的catalog,可以訪問外部系統的資料或檔案。 - 執行SQL查詢。 - 註冊一個使用者自定義的function。 - 將DataStream或DataSet轉成Table。 一個查詢中只能繫結一個指定的TableEnvironment,TableEnvironment可以通過來配置TableConfig來配置,通過TableConfig可以自定義查詢優化以及translation的程序。 TableEnvironment執行過程如下: - TableEnvironment.sql()為呼叫入口; - Flink實現了FlinkPlannerImpl,執行parse(sql),validate(sqlNode),rel(sqlNode)操作; - 生成Table; 具體程式碼摘要如下 ```java package org.apache.Flink.table.api.internal; @Internal public class TableEnvironmentImpl implements TableEnvironment { private final CatalogManager catalogManager; private final ModuleManager moduleManager; private final OperationTreeBuilder operationTreeBuilder; private fi