1. 程式人生 > >Flink sql的實現

Flink sql的實現

SQL Impl in Flink

跟了下Flink Table裡sql的實現,flink sql的實現比較簡單,一句話概述就是:藉助Apache Calcite做了sql解析、邏輯樹生成的過程,得到Calcite的RelRoot類,生成flink的TableTable裡的執行計劃會轉化成DataSet的計算,經歷物理執行計劃優化等步驟。

類比Spark SQL,Calcite代替了大部分Spark SQL Catalyst的工作(Catalyst還包括了Tree/Node的定義,這部分程式碼Flink也’借鑑’來了)。兩者最終是計算一顆邏輯執行計劃樹,翻譯成各自的DataSet(Spark 2.0引入Dataset

並統一DataFrame,隱藏RDD到引擎內部這層,類似於執行層內部的物理執行節點)。

Calcite Usage

最新Flink程式碼裡,在flink-table工程裡,使用1.7版本的calcite-core

大致的執行過程如下:

  1. TableEnvironment.sql()為呼叫入口
  2. 類似Calcite的PlannerImpl,flink實現了個FlinkPlannerImpl,執行parse(sql)validate(sqlNode)rel(sqlNode)操作
  3. 生成Table
  override def sql(query: String): Table = {

    val
planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner) // parse the sql query val parsed = planner.parse(query) // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) new Table(this, LogicalRelNode(relational.rel)) }

LogicalRelNode是flink執行計算樹裡的葉子節點。其他節點的實現類最終都會轉化成Calcite的RelBuilder生成一個可被Calcite繼續執行計劃優化的plan,邏輯在TableEnv的translate(table)方法裡。

 protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {

    val relNode = table.getRelNode

    // decorrelate
    val decorPlan = RelDecorrelator.decorrelateQuery(relNode)

    // optimize the logical Flink plan
    val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
    val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()

    val dataSetPlan = try {
      optProgram.run(getPlanner, decorPlan, flinkOutputProps)
    }
    catch {
      // ...
    }

    dataSetPlan match {
      case node: DataSetRel =>
        node.translateToPlan(
          this,
          Some(tpe.asInstanceOf[TypeInformation[Any]])
        ).asInstanceOf[DataSet[A]]
      case _ => ???
    }
  }

Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)裡,flink根據Calcite的介面定義了幾個將最終物理計劃轉化為DataSet的Rule:

    // translate to Flink DataSet nodes
    DataSetAggregateRule.INSTANCE,
    DataSetCalcRule.INSTANCE,
    DataSetJoinRule.INSTANCE,
    DataSetScanRule.INSTANCE,
    DataSetUnionRule.INSTANCE,
    DataSetSortRule.INSTANCE,
    DataSetValuesRule.INSTANCE,
    BatchTableSourceScanRule.INSTANCE

每條規則會對應生成一個物理節點,org.apache.flink.api.table.plan.nodes.dataset package下。節點內,根據Calcite生成的sql的執行步驟,會進行codegen出DataSet的執行Function程式碼,在org.apache.flink.api.table.runtime package下,目前生成三種ds操作: FlatMapRunnerFlatJoinRunner, 和MapRunner.

codegen部分與Spark SQL的結構相類似。

Calcite在Flink中的使用也比較基本,單測sql package下的case就可以走通上面的呼叫過程。

整體Flink sql上的功能和實現要比Spark SQL簡單很多。並可能存在許多借鑑之處。

相關推薦

Flink sql實現

SQL Impl in Flink 跟了下Flink Table裡sql的實現,flink sql的實現比較簡單,一句話概述就是:藉助Apache Calcite做了sql解析、邏輯樹生成的過程,得到Calcite的RelRoot類,生成flink的Table

技本功丨用短平快的方式告訴你:Flink-SQL的擴展實現

mps flink 加載 ast 正則表達 快的 網絡 叠代 參數 2019年1月28日,阿裏雲宣布開源“計算王牌”實時計算平臺Blink回饋給ApacheFlink社區。官方稱,計算延遲已經降到毫秒級,也就是你在瀏覽網頁的時候,眨了一下眼睛,淘寶、

sql 實現用戶名、郵箱、手機號登錄

append nes class select logs lec email mobile () StringBuilder strSql = new StringBuilder(); strSql.Append("se

SQL實現小算法(輔助決策)_ 計算商品評分、及時補貨

mysql分別把 計算各自的 1、點擊量/點擊量均值 2、銷售量/銷售量均值 兩者相加,可以得到一個簡單評分 又有問題了,豬肉的評分不應該比五花肉多。 因此我們要加入簡單的權重,譬如點擊量評分占30%。銷售量評分占70%select p_type,p_name, (p_view/view_avg)

Oracle使用SQL實現矩陣轉置

row 多人 遇到 數據 number decode 分享 展示 mode 在使用數據庫使用報表時,往往會遇到矩陣轉置。這個需求在Excel是很容易實現的,但很多人都不知道怎麽用Oracle數據庫實現,下面給大家展示幾種使用SQL實現的方法。 需求:表1轉置成表2 測試數

mybatis復雜sql實現

format 參數 sep date類型 base batis pos eat ring 1、like語句 示例 @Select("select * from t_xx where Fxx =‘A‘ and Fdd like concat(‘%‘,#{dd},‘%‘)"

Flink+kafka實現Wordcount實時計算

lis AS -c 安裝包 pos localhost 行動 private 配置信息 1. Flink Flink介紹: Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所

動態sql實現分頁查詢

類對象 span ecp IT clas listitem 結果 創建 sku 1.創建實體類對象需要查詢的條件com.rl.ecps.model.QueryCondition   private Long brandId;   private Short auditSta

oracle 用sql實現密碼的加密,解密

bsp rom oracl div ora 解密 varchar2 base decode select utl_raw.cast_to_varchar2(utl_encode.base64_encode(utl_raw.cast_to_raw(‘123456‘))) f

Mybatis之攔截器--獲取執行SQL實現多客戶端數據同步

gin sign factor 方便 完成後 動態代理 exc batis 包安裝 最近的一個項目是將J2EE環境打包安裝在客戶端(使用 nwjs + NSIS 制作安裝包)運行, 所有的業務操作在客戶端完成, 數據存儲在客戶端數據庫中. 服務器端數據庫匯總各客戶端的數據進

SQL實現沒有這條資料就新增,有這條資料就修改

方法一:insert into on duplicate key update  舉個例子,欄位a被定義為UNIQUE,並且原資料庫表table中已存在記錄(2,2,9)和(3,2,1),如果插入記錄的a值與原有記錄重複,則更新原有記錄,否則插入新行: INSERT INTO

織夢用dede:sql實現列表頁分頁教程方法

將dede:list標籤進行改造,使用SQL標籤實現靜態分頁,在自定義表單呼叫的分頁用他就很方便 例如會員列表的模板標籤寫法 {dede:listsql sql="select * from myblog_member" pagesize="10"} <li><a href="https

用JAVA連線SQL實現查詢資料

顯示所有學生程式碼 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql.*" pageEncoding="UTF-8"%> <!DOCTYPE html>

用JAVA連線SQL實現更新資料

在程式碼中更新資料 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql.*" pageEncoding="UTF-8"%> <!DOCTYPE html>

用JAVA連線SQL實現插入資料

         直接由程式碼來決定插入的資料。 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql

用JAVA連線SQL實現刪除資料

刪除一條資料 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql.*" pageEncoding="UTF-8"%> <!DOCTYPE html> &l

sql實現分組查詢

DROP TABLE IF EXISTS testor;CREATE TABLE testor (id int(11) NOT NULL,name varchar(255) DEFAULT NULL,crdate datetime DEF

踩坑經歷(九)一條雙層迴圈的SQL實現業務需求

業務場景 類目 背景 資料特點 表沒有唯一主鍵,相同id可能有很多條 需求 取每條資料記錄的最新記錄 SQL實現 (1

年薪50萬前阿里工程師分享如何構建flink sql平臺

我們都知道,離線計算有Hive,使用過的知道,需要先定義一個schema,比如針對HDFS這種儲存對標mysql定義一個schema,schema的本質是什麼?主要描述下面這些資訊 1)當前儲存的物理位置的描述 2)資料格式的組成形式 然後Hive可以讓使用者定義一段sql,針對上面定義

如何構建一個flink sql平臺

在本系列前面的文章中,簡單介紹了一下Ignite的機器學習網格,下面會趁熱打鐵,結合一些示例,深入介紹Ignite支援的一些機器學習演算法。 如果要找合適的資料集,會發現可用的有很多,但是對於線性迴歸來說,一個非常好的備選資料集就是房價,可以非常方便地從UCI網站獲取合適的資料。 在本文中會訓練一個線性迴