1. 程式人生 > 實用技巧 >Flink基礎(三十三):FLINK SQL(九)EXPLAIN 語句

Flink基礎(三十三):FLINK SQL(九)EXPLAIN 語句

EXPLAIN 語句用來解釋一條 query 語句或者 INSERT 語句的邏輯計劃和優化後的計劃。

執行一條 EXPLAIN 語句

EXPLAIN 語句可以通過TableEnvironmentexecuteSql()執行,也可以在SQL CLI中執行 EXPLAIN 語句。 若 EXPLAIN 操作執行成功,executeSql()方法返回解釋的結果,否則會丟擲異常。

以下的例子展示瞭如何在 TableEnvironment 和 SQL CLI 中執行一條 EXPLAIN 語句。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tEnv 
= StreamTableEnvironment.create(env) // register a table named "Orders" tEnv.executeSql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)") tEnv.executeSql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)") // explain SELECT statement through TableEnvironment.explainSql()
val explanation = tEnv.explainSql( "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + "UNION ALL " + "SELECT count, word FROM MyTable2") println(explanation) // explain SELECT statement through TableEnvironment.executeSql() val tableResult = tEnv.executeSql( "EXPLAIN PLAN FOR " + "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + "UNION ALL " + "SELECT count, word FROM MyTable2") tableResult.print()
Flink SQL> CREATE TABLE MyTable1 (count bigint, work VARCHAR(256);
[INFO] Table has been created.

Flink SQL> CREATE TABLE MyTable2 (count bigint, work VARCHAR(256);
[INFO] Table has been created.

Flink SQL> EXPLAIN PLAN FOR SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' 
> UNION ALL 
> SELECT count, word FROM MyTable2;

執行EXPLAIN語句後的結果為:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
  

== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

== Physical Execution Plan ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

Stage 2 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 3 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE

        Stage 4 : Operator
            content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
            ship_strategy : FORWARD

            Stage 5 : Operator
                content : from: (count, word)
                ship_strategy : REBALANCE

語法

EXPLAIN PLAN FOR <query_statement_or_insert_statement>