Flink從入門到真香(17、Flink 重磅功能 Table API(Flink SQL))
阿新 • • 發佈:2020-11-26
Flink對批處理和流處理,提供了統一的上層API
Table API是一套內嵌在java和scala語言中的查詢api,它允許以非常直觀的方式組合來自一些關係運算符的查詢
Flink的sql支援基於實現了sql標準的Apache calcite
先來個栗子感受下:
demo效果: 在資料來源txt中讀取,輸出id和temperature 這2個欄位,按照id做篩選,輸出,分別用table api和sql來實現
在pom.xml中加入依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.10.1</version> </dependency> <!-- 也可以不用引入下面的包,因為上面已經包含了--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.10.1</version> </dependency>
在tabletest包下建一個Example object:
package com.mafei.apitest.tabletest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.Table import org.apache.flink.table.api.scala._ object Example { def main(args: Array[String]): Unit = { //建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.setAutoWatermarkInterval(200) //直接全域性設定watermark的時間為200毫秒 val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") env.setParallelism(1) //先轉換成樣例類型別 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割資料,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個感測器類的資料,引數中傳toLong和toDouble是因為預設分割後是字串類別 }) //首先建立表執行環境 val tableEnv = StreamTableEnvironment.create(env) //基於流建立一張表 val dataTable: Table = tableEnv.fromDataStream(dataStream) //呼叫table api進行轉換 val resultTable = dataTable .select("id, temperature") .filter("id == 'sensor3'") resultTable.toAppendStream[(String,Double)].print("result") //第二種,直接寫sql來實現 tableEnv.createTemporaryView("table1", dataTable) val sql: String = "select id, temperature from table1 where id='sensor1'" val resultSqlTable = tableEnv.sqlQuery(sql) resultSqlTable.toAppendStream[(String, Double)].print("result sql") env.execute("table api example") } }
程式碼結構及執行效果:
看到效果之後再來分析結構:
Table API和SQL的程式結構,與流式處理的程式結構十分類似
//建立表執行環境 val tableEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment) //建立一張表,用於讀取資料 tableEnv.connect(....).createTemporayTable("inputTable") //註冊一張表,用於把計算結果輸出 tableEnv.connect(....).createTemporaryTable("outputTable") //通過Table API查詢運算元,得到一張結果表 val result = tableEnv.from("inputTable").select() //通過sql查詢語句,得到一張表 val sqlResult = tableEnv.sqlQuery("select id, temperature from table1 where id='sensor1'") //將結果表寫入到輸出表中 result.insertInto("outputTable")
幾種引擎實現方式
Flink SQL有好幾種實現方式,其中blink 是阿里內部使用後來開源合併到flink的引擎,來看看幾種使用方式
/**
*
* @author mafei
* @date 2020/11/22
*/
package com.mafei.apitest.tabletest
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.api.scala._
object TableApi1 {
def main(args: Array[String]): Unit = {
//1 、建立環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = StreamTableEnvironment.create(env)
//1,1 基於老版本的planner的流處理
val settings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val oldStreamTableEnv = StreamTableEnvironment.create(env, settings)
//1.2 基於老版本的批處理環境
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)
//1.3基於blink planner的流處理
val blinkStreamSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)
//基於blink planner的批處理
val blinkBatchSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build()
val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)
}
}