flink實戰開發----flinkSQL入門大全
flinkSQL概念介紹
Table API & SQL
Apache Flink具有兩個關係API - 表API和SQL - 用於統一流和批處理。Table API是Scala和Java的語言整合查詢API,允許以非常直觀的方式組合來自關係運算符的查詢,Table API和SQL介面彼此緊密整合,以及Flink的DataStream和DataSet API。您可以輕鬆地在基於API構建的所有API和庫之間切換。例如,您可以使用CEP庫從DataStream中提取模式,然後使用Table API分析模式,或者可以在預處理上執行Gelly圖演算法之前使用SQL查詢掃描,過濾和聚合批處理表資料。
TableEnvironment
職責
這TableEnvironment是Table API和SQL整合的核心概念。它負責:
- 在內部目錄中註冊一個表
- 註冊外部目錄
- 執行SQL查詢
- 註冊使用者定義的(標量,表或聚合)函式
- 把一個DataStream或DataSet轉換為一個表Table
- 持有對ExecutionEnvironment或StreamExecutionEnvironment的引用
建立一個TableEnvironment
一個TableEnvironment是通過呼叫靜態建立TableEnvironment.getTableEnvironment()
程式碼:
//獲取table
val tableEnv = TableEnvironment.getTableEnvironment(env)
在TableEnvironment中登錄檔
TableEnvironment維護一個表的目錄,這些表是按名稱註冊的。有兩種型別的表、輸入表和輸出表。輸入表可以在表API和SQL查詢中引用,並提供輸入資料。輸出表可以用來將表API或SQL查詢的結果傳送到外部系統。可以從各種來源註冊輸入表:
- 現有Table物件,通常是Table API或SQL查詢的結果。
- TableSource,訪問外部資料,例如檔案,資料庫或訊息傳遞系統。
- DataStream或DataSet來自DataStream或DataSet程式。註冊一個DataStream或DataSet
一個輸出表可以被註冊使用TableSink
程式碼:
val tableEnv = TableEnvironment.getTableEnvironment(env)
val projTable: Table = tableEnv.scan("X").select(...)
//登錄檔
tableEnv.registerTable("projectedTable", projTable)
註冊一個TableSink
一個已註冊的表可以用來將表API或SQL查詢的結果傳送到外部儲存系統,比如資料庫、鍵值儲存、訊息佇列或檔案系統(在不同的編碼中,例如CSV、Apache Parquet、Avro……)。
說白了就是:table sink的作用就是如何將flink sql查詢的資料儲存到外部系統,如hdfs或者本地檔案,資料庫,hbase等。
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
SQL 語句
FlinkSQL,它實現了SQL標準。SQL查詢被指定為常規字串。
SQL文件描述了Flink對流式和批處理表的SQL支援。
主要包括:sqlQuery和sqlUpdate
sqlQuery:主要用於sql查詢
sqlUpdate:用於刪除,更新等操作
案例一:如何指定一個查詢並將結果作為一張表返回
val tableEnv = TableEnvironment.getTableEnvironment(env)
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
案例二:指定一個更新查詢,該查詢將其結果插入到已登錄檔中
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
flinkSQL執行計劃
表API和SQL查詢將轉換為DataStream或DataSet程式,具體取決於它們的輸入是流式還是批量輸入。查詢在內部表示為邏輯查詢計劃,並分為兩個階段:
- 優化邏輯計劃,
- 轉換為DataStream或DataSet程式。
table與DataStream和DataSet API整合
表API和SQL查詢可以輕鬆整合並嵌入到DataStream和DataSet程式中。例如,可以查詢外部表(例如來自RDBMS),進行一些預處理,例如過濾,預測,聚合或加入元資料,然後使用DataStream或進一步處理資料。相反,Table API或SQL查詢也可以應用於DataStream或DataSet程式的結果。這種相互作用可以通過將一個DataStream或DataSet轉換為一個Table來實現,反之亦然。
Scala的隱式轉換
scala表API功能的隱式轉換DataSet,DataStream以及Table類。org.apache.flink.table.api.scala._除了org.apache.flink.api.scala._ Scala DataStream API 之外,還可以通過匯入包來啟用這些轉換
注:flink程式設計必須匯入import org.apache.flink.api.scala._,flinkSQL程式設計必須匯入import org.apache.flink.table.api._
將DataStream或DataSet轉換為表
我們可以通過TableEnvironment將獲得資料來源的DataStream或DataSet轉化成Table,在使用flinkSQL的時候這樣將會十分便捷。
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
Table table1 = tableEnv.fromDataStream(stream);
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
將Table轉換為DataStream或DataSet
一個Table可以轉換為DataStream或DataSet。通過這種方式,可以在Table API或SQL查詢的結果上執行自定義DataStream或DataSet程式。
當轉換一個Table成DataStream或DataSet,需要指定將所得的資料型別DataStream或DataSet,即,資料型別到其中的行Table是要被轉換。通常最方便的轉換型別是Row。以下列表概述了不同選項的功能:
- 行:欄位按位置,任意數量的欄位對映,支援null值,無型別安全訪問。
- POJO:欄位按名稱對映(POJO欄位必須命名為Table欄位),任意數量的欄位,支援null值,型別安全訪問。
- 樣例類Case Class:欄位按位置對映,不支援null值,型別安全訪問。
- 元組:欄位按位置對映,限制為22(Scala)或25(Java)欄位,不支援null值,型別安全訪問。
- 原子型別:Table必須具有單個欄位,不支援null值,型別安全訪問。
將錶轉換為DataStream
一個Table是流媒體查詢的結果將動態更新,即它正在改變,因為新記錄的查詢的輸入流到達。因此,DataStream轉換這種動態查詢需要對錶的更新進行編碼。
將一個Table轉換為一個DataStream有兩種模式:
- 追加模式:只有在動態Table僅通過INSERT更改修改時才能使用此模式,即它僅附加並且以前發出的結果永遠不會更新。
- 縮排模式:始終可以使用此模式。它用標誌編碼INSERT和DELETE改變boolean。
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
將錶轉換為DataSet
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
flinkSQL語法
SQL查詢是使用sqlQuery()方法指定的TableEnvironment。該方法返回SQL查詢的結果為Table。A Table可以在後續的SQL和Table API查詢中使用,可以轉換為DataSet或DataStream,也可以寫入TableSink)。要訪問SQL查詢中的表,必須在TableEnvironment中註冊它。可以從TableSource,Table,DataStream或DataSet 登錄檔。或者,使用者還可以在TableEnvironment中註冊外部目錄以指定資料來源的位置。
注意: Flink的SQL支援尚未完成。包含不受支援的SQL功能的查詢會導致a TableException。以下部分列出了批處理和流表上SQL的受支援功能。
支援的語法
支援標準的ANSI SQL。Flink不支援DDL語句。
入門案例
需求:
使用flinkSQL,獲取文字中的使用者的姓名
資料準備
建立一個person.txt,內容如下:
kebe men
wede men
baby wemen
james men
程式碼
package flinkSQL
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import scala.language.postfixOps
/**
* Created by ${WangZhiHua} on 2018/11/12
*/
object sql_test {
def main(args: Array[String]): Unit = {
//獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//獲取table
val tableEnv = TableEnvironment.getTableEnvironment(env)
//讀取資料來源
val source1 = env.readTextFile("C:/flink_data/person.txt")
val source2: DataStream[Person1] = source1.map(x=>{
val split = x.split(" ")
( Person1(split(0),split(1)))
})
//將DataStream轉化成Table
val table1 = tableEnv.fromDataStream(source2)
//登錄檔,表名為:person
tableEnv.registerTable("person",table1)
//獲取表中所有資訊
val rs: Table = tableEnv.sqlQuery("select * from person ")
val stream: DataStream[String] = rs
//過濾獲取name這一列的資料
.select("name")
//將錶轉化成DataStream
.toAppendStream[String]
stream.print()
env.execute("flinkSQL")
}
}
/**
* 定義樣例類封裝資料
*/
case class Person1(name:String ,score:String)