1. 程式人生 > >flink實戰開發----flinkSQL入門大全

flink實戰開發----flinkSQL入門大全

flinkSQL概念介紹

Table API & SQL

Apache Flink具有兩個關係API - 表API和SQL - 用於統一流和批處理。Table API是Scala和Java的語言整合查詢API,允許以非常直觀的方式組合來自關係運算符的查詢,Table API和SQL介面彼此緊密整合,以及Flink的DataStream和DataSet API。您可以輕鬆地在基於API構建的所有API和庫之間切換。例如,您可以使用CEP庫從DataStream中提取模式,然後使用Table API分析模式,或者可以在預處理上執行Gelly圖演算法之前使用SQL查詢掃描,過濾和聚合批處理表資料。

TableEnvironment

職責

TableEnvironment是Table API和SQL整合的核心概念。它負責:

  • 在內部目錄中註冊一個表
  • 註冊外部目錄
  • 執行SQL查詢
  • 註冊使用者定義的(標量,表或聚合)函式
  • 把一個DataStreamDataSet轉換為一個表Table
  • 持有對ExecutionEnvironmentStreamExecutionEnvironment的引用

建立一個TableEnvironment

一個TableEnvironment是通過呼叫靜態建立TableEnvironment.getTableEnvironment()

用的方法StreamExecutionEnvironmentExecutionEnvironment與可選的TableConfig。該TableConfig可用於配置TableEnvironment或定製查詢優化和翻譯過程

程式碼:

//獲取table
val tableEnv = TableEnvironment.getTableEnvironment(env)

TableEnvironment中登錄檔

TableEnvironment維護一個表的目錄,這些表是按名稱註冊的。有兩種型別的表、輸入表和輸出表。輸入表可以在表API和SQL查詢中引用,並提供輸入資料。輸出表可以用來將表API或SQL查詢的結果傳送到外部系統。可以從各種來源註冊輸入表:

  • 現有Table物件,通常是Table API或SQL查詢的結果。
  • TableSource,訪問外部資料,例如檔案,資料庫或訊息傳遞系統
  •  DataStreamDataSet來自DataStream或DataSet程式。註冊一個DataStreamDataSet

一個輸出表可以被註冊使用TableSink

程式碼:

val tableEnv = TableEnvironment.getTableEnvironment(env)

val projTable: Table = tableEnv.scan("X").select(...)
//登錄檔
tableEnv.registerTable("projectedTable", projTable)

註冊一個TableSink

一個已註冊的表可以用來將表API或SQL查詢的結果傳送到外部儲存系統,比如資料庫、鍵值儲存、訊息佇列或檔案系統(在不同的編碼中,例如CSV、Apache Parquet、Avro……)。

說白了就是:table sink的作用就是如何將flink sql查詢的資料儲存到外部系統,如hdfs或者本地檔案,資料庫,hbase等。

val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

SQL 語句

FlinkSQL,它實現了SQL標準。SQL查詢被指定為常規字串。

SQL文件描述了Flink對流式和批處理表的SQL支援。

主要包括:sqlQuery和sqlUpdate

sqlQuery:主要用於sql查詢

sqlUpdate:用於刪除,更新等操作

案例一:如何指定一個查詢並將結果作為一張表返回

val tableEnv = TableEnvironment.getTableEnvironment(env)

val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

案例二:指定一個更新查詢,該查詢將其結果插入到已登錄檔中

val tableEnv = TableEnvironment.getTableEnvironment(env)

tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

flinkSQL執行計劃

表API和SQL查詢將轉換為DataStreamDataSet程式,具體取決於它們的輸入是流式還是批量輸入。查詢在內部表示為邏輯查詢計劃,並分為兩個階段:

  1. 優化邏輯計劃,
  2. 轉換為DataStream或DataSet程式。

table與DataStream和DataSet API整合

表API和SQL查詢可以輕鬆整合並嵌入到DataStreamDataSet程式中。例如,可以查詢外部表(例如來自RDBMS),進行一些預處理,例如過濾,預測,聚合或加入元資料,然後使用DataStream或進一步處理資料。相反,Table API或SQL查詢也可以應用於DataStream或DataSet程式的結果。這種相互作用可以通過將一個DataStream或DataSet轉換為一個Table來實現,反之亦然。

Scala的隱式轉換

scala表API功能的隱式轉換DataSetDataStream以及Table類。org.apache.flink.table.api.scala._除了org.apache.flink.api.scala._   Scala DataStream API 之外,還可以通過匯入包來啟用這些轉換

注:flink程式設計必須匯入import org.apache.flink.api.scala._,flinkSQL程式設計必須匯入import org.apache.flink.table.api._

將DataStream或DataSet轉換為表

我們可以通過TableEnvironment將獲得資料來源的DataStream或DataSet轉化成Table,在使用flinkSQL的時候這樣將會十分便捷。

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
Table table1 = tableEnv.fromDataStream(stream);
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

將Table轉換為DataStream或DataSet

一個Table可以轉換為DataStreamDataSet。通過這種方式,可以在Table API或SQL查詢的結果上執行自定義DataStream或DataSet程式。

當轉換一個TableDataStreamDataSet,需要指定將所得的資料型別DataStreamDataSet,即,資料型別到其中的行Table是要被轉換。通常最方便的轉換型別是Row。以下列表概述了不同選項的功能:

  • :欄位按位置,任意數量的欄位對映,支援null值,無型別安全訪問。
  • POJO:欄位按名稱對映(POJO欄位必須命名為Table欄位),任意數量的欄位,支援null值,型別安全訪問。
  • 樣例Case Class:欄位按位置對映,不支援null值,型別安全訪問。
  • 元組:欄位按位置對映,限制為22(Scala)或25(Java)欄位,不支援null值,型別安全訪問。
  • 原子型別Table必須具有單個欄位,不支援null值,型別安全訪問。

將錶轉換為DataStream

一個Table是流媒體查詢的結果將動態更新,即它正在改變,因為新記錄的查詢的輸入流到達。因此,DataStream轉換這種動態查詢需要對錶的更新進行編碼。

一個Table轉換為一個DataStream有兩種模式:

  1. 追加模式:只有在動態Table僅通過INSERT更改修改時才能使用此模式,即它僅附加並且以前發出的結果永遠不會更新。
  2. 縮排模式:始終可以使用此模式。它用標誌編碼INSERTDELETE改變boolean
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

將錶轉換為DataSet

val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

flinkSQL語法

SQL查詢是使用sqlQuery()方法指定的TableEnvironment。該方法返回SQL查詢的結果為Table。A Table可以在後續的SQL和Table API查詢中使用,可以轉換為DataSet或DataStream,也可以寫入TableSink)。要訪問SQL查詢中的表,必須在TableEnvironment中註冊它。可以從TableSourceTableDataStream或DataSet 登錄檔。或者,使用者還可以在TableEnvironment中註冊外部目錄以指定資料來源的位置。

注意: Flink的SQL支援尚未完成。包含不受支援的SQL功能的查詢會導致a TableException。以下部分列出了批處理和流表上SQL的受支援功能。

支援的語法

支援標準的ANSI SQL。Flink不支援DDL語句。

入門案例

需求:

使用flinkSQL,獲取文字中的使用者的姓名

資料準備

建立一個person.txt,內容如下:

kebe men
wede men
baby wemen
james men

程式碼

package flinkSQL
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import scala.language.postfixOps
/**
  * Created by  ${WangZhiHua} on 2018/11/12
  */
object sql_test {
  def main(args: Array[String]): Unit = {
    //獲取執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //獲取table
      val tableEnv = TableEnvironment.getTableEnvironment(env)
    //讀取資料來源
    val source1 = env.readTextFile("C:/flink_data/person.txt")
    val source2: DataStream[Person1] = source1.map(x=>{
      val split = x.split(" ")
      ( Person1(split(0),split(1)))
    })
    //將DataStream轉化成Table
    val table1 = tableEnv.fromDataStream(source2)
    //登錄檔,表名為:person
    tableEnv.registerTable("person",table1)
    //獲取表中所有資訊
    val rs: Table = tableEnv.sqlQuery("select *  from person ")
    val stream: DataStream[String] = rs
    //過濾獲取name這一列的資料
      .select("name")
      //將錶轉化成DataStream
      .toAppendStream[String]
     stream.print()
    env.execute("flinkSQL")
  }
}

/**
  * 定義樣例類封裝資料
  */
case class  Person1(name:String ,score:String)