Flink sql的實現
SQL Impl in Flink
跟了下Flink Table裡sql的實現,flink sql的實現比較簡單,一句話概述就是:藉助Apache Calcite做了sql解析、邏輯樹生成的過程,得到Calcite的RelRoot類,生成flink的Table,Table裡的執行計劃會轉化成DataSet的計算,經歷物理執行計劃優化等步驟。
類比Spark SQL,Calcite代替了大部分Spark SQL Catalyst的工作(Catalyst還包括了Tree/Node的定義,這部分程式碼Flink也’借鑑’來了)。兩者最終是計算一顆邏輯執行計劃樹,翻譯成各自的DataSet(Spark 2.0引入Dataset
Calcite Usage
最新Flink程式碼裡,在flink-table
工程裡,使用1.7版本的calcite-core
。
大致的執行過程如下:
- TableEnvironment.
sql()
為呼叫入口 - 類似Calcite的PlannerImpl,flink實現了個FlinkPlannerImpl,執行
parse(sql)
,validate(sqlNode)
,rel(sqlNode)
操作 - 生成Table
override def sql(query: String): Table = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner)
// parse the sql query
val parsed = planner.parse(query)
// validate the sql query
val validated = planner.validate(parsed)
// transform to a relational tree
val relational = planner.rel(validated)
new Table(this, LogicalRelNode(relational.rel))
}
LogicalRelNode是flink執行計算樹裡的葉子節點。其他節點的實現類最終都會轉化成Calcite的RelBuilder生成一個可被Calcite繼續執行計劃優化的plan,邏輯在TableEnv的translate(table)方法裡。
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
val relNode = table.getRelNode
// decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
// optimize the logical Flink plan
val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
val flinkOutputProps = relNode.getTraitSet.replace(DataSetConvention.INSTANCE).simplify()
val dataSetPlan = try {
optProgram.run(getPlanner, decorPlan, flinkOutputProps)
}
catch {
// ...
}
dataSetPlan match {
case node: DataSetRel =>
node.translateToPlan(
this,
Some(tpe.asInstanceOf[TypeInformation[Any]])
).asInstanceOf[DataSet[A]]
case _ => ???
}
}
在Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)裡,flink根據Calcite的介面定義了幾個將最終物理計劃轉化為DataSet的Rule:
// translate to Flink DataSet nodes
DataSetAggregateRule.INSTANCE,
DataSetCalcRule.INSTANCE,
DataSetJoinRule.INSTANCE,
DataSetScanRule.INSTANCE,
DataSetUnionRule.INSTANCE,
DataSetSortRule.INSTANCE,
DataSetValuesRule.INSTANCE,
BatchTableSourceScanRule.INSTANCE
每條規則會對應生成一個物理節點,org.apache.flink.api.table.plan.nodes.dataset
package下。節點內,根據Calcite生成的sql的執行步驟,會進行codegen出DataSet的執行Function程式碼,在org.apache.flink.api.table.runtime
package下,目前生成三種ds操作: FlatMapRunner, FlatJoinRunner, 和MapRunner.
codegen部分與Spark SQL的結構相類似。
Calcite在Flink中的使用也比較基本,單測sql
package下的case就可以走通上面的呼叫過程。
整體Flink sql上的功能和實現要比Spark SQL簡單很多。並可能存在許多借鑑之處。
相關推薦
Flink sql的實現
SQL Impl in Flink 跟了下Flink Table裡sql的實現,flink sql的實現比較簡單,一句話概述就是:藉助Apache Calcite做了sql解析、邏輯樹生成的過程,得到Calcite的RelRoot類,生成flink的Table
技本功丨用短平快的方式告訴你:Flink-SQL的擴展實現
mps flink 加載 ast 正則表達 快的 網絡 叠代 參數 2019年1月28日,阿裏雲宣布開源“計算王牌”實時計算平臺Blink回饋給ApacheFlink社區。官方稱,計算延遲已經降到毫秒級,也就是你在瀏覽網頁的時候,眨了一下眼睛,淘寶、
sql 實現用戶名、郵箱、手機號登錄
append nes class select logs lec email mobile () StringBuilder strSql = new StringBuilder(); strSql.Append("se
純SQL實現小算法(輔助決策)_ 計算商品評分、及時補貨
mysql分別把 計算各自的 1、點擊量/點擊量均值 2、銷售量/銷售量均值 兩者相加,可以得到一個簡單評分 又有問題了,豬肉的評分不應該比五花肉多。 因此我們要加入簡單的權重,譬如點擊量評分占30%。銷售量評分占70%select p_type,p_name, (p_view/view_avg)
Oracle使用SQL實現矩陣轉置
row 多人 遇到 數據 number decode 分享 展示 mode 在使用數據庫使用報表時,往往會遇到矩陣轉置。這個需求在Excel是很容易實現的,但很多人都不知道怎麽用Oracle數據庫實現,下面給大家展示幾種使用SQL實現的方法。 需求:表1轉置成表2 測試數
mybatis復雜sql實現
format 參數 sep date類型 base batis pos eat ring 1、like語句 示例 @Select("select * from t_xx where Fxx =‘A‘ and Fdd like concat(‘%‘,#{dd},‘%‘)"
Flink+kafka實現Wordcount實時計算
lis AS -c 安裝包 pos localhost 行動 private 配置信息 1. Flink Flink介紹: Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所
動態sql實現分頁查詢
類對象 span ecp IT clas listitem 結果 創建 sku 1.創建實體類對象需要查詢的條件com.rl.ecps.model.QueryCondition private Long brandId; private Short auditSta
oracle 用sql實現密碼的加密,解密
bsp rom oracl div ora 解密 varchar2 base decode select utl_raw.cast_to_varchar2(utl_encode.base64_encode(utl_raw.cast_to_raw(‘123456‘))) f
Mybatis之攔截器--獲取執行SQL實現多客戶端數據同步
gin sign factor 方便 完成後 動態代理 exc batis 包安裝 最近的一個項目是將J2EE環境打包安裝在客戶端(使用 nwjs + NSIS 制作安裝包)運行, 所有的業務操作在客戶端完成, 數據存儲在客戶端數據庫中. 服務器端數據庫匯總各客戶端的數據進
SQL實現沒有這條資料就新增,有這條資料就修改
方法一:insert into on duplicate key update 舉個例子,欄位a被定義為UNIQUE,並且原資料庫表table中已存在記錄(2,2,9)和(3,2,1),如果插入記錄的a值與原有記錄重複,則更新原有記錄,否則插入新行: INSERT INTO
織夢用dede:sql實現列表頁分頁教程方法
將dede:list標籤進行改造,使用SQL標籤實現靜態分頁,在自定義表單呼叫的分頁用他就很方便 例如會員列表的模板標籤寫法 {dede:listsql sql="select * from myblog_member" pagesize="10"} <li><a href="https
用JAVA連線SQL實現查詢資料
顯示所有學生程式碼 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql.*" pageEncoding="UTF-8"%> <!DOCTYPE html>
用JAVA連線SQL實現更新資料
在程式碼中更新資料 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql.*" pageEncoding="UTF-8"%> <!DOCTYPE html>
用JAVA連線SQL實現插入資料
直接由程式碼來決定插入的資料。 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql
用JAVA連線SQL實現刪除資料
刪除一條資料 <%@ page language="java" contentType="text/html; charset=UTF-8" import="java.sql.*" pageEncoding="UTF-8"%> <!DOCTYPE html> &l
sql實現分組查詢
DROP TABLE IF EXISTS testor;CREATE TABLE testor (id int(11) NOT NULL,name varchar(255) DEFAULT NULL,crdate datetime DEF
踩坑經歷(九)一條雙層迴圈的SQL實現業務需求
業務場景 類目 背景 資料特點 表沒有唯一主鍵,相同id可能有很多條 需求 取每條資料記錄的最新記錄 SQL實現 (1
年薪50萬前阿里工程師分享如何構建flink sql平臺
我們都知道,離線計算有Hive,使用過的知道,需要先定義一個schema,比如針對HDFS這種儲存對標mysql定義一個schema,schema的本質是什麼?主要描述下面這些資訊 1)當前儲存的物理位置的描述 2)資料格式的組成形式 然後Hive可以讓使用者定義一段sql,針對上面定義
如何構建一個flink sql平臺
在本系列前面的文章中,簡單介紹了一下Ignite的機器學習網格,下面會趁熱打鐵,結合一些示例,深入介紹Ignite支援的一些機器學習演算法。 如果要找合適的資料集,會發現可用的有很多,但是對於線性迴歸來說,一個非常好的備選資料集就是房價,可以非常方便地從UCI網站獲取合適的資料。 在本文中會訓練一個線性迴