1. 程式人生 > 實用技巧 >Flink基礎(十六):Table API 和 Flink SQL(五)函式(Functions)

Flink基礎(十六):Table API 和 Flink SQL(五)函式(Functions)

1 系統內建函式

  Flink Table API 和 SQL 為使用者提供了一組用於資料轉換的內建函式。SQL 中支援的很多函式,Table API 和 SQL 都已經做了實現,其它還在快速開發擴充套件中。   以下是一些典型函式的舉例,全部的內建函式,可以參考官網介紹。 ⚫ 比較函式 SQL: value1 = value2 value1 > value2 Table API: ANY1 === ANY2 ANY1 > ANY2 ⚫ 邏輯函式 SQL: boolean1 OR boolean2 boolean IS FALSE NOT boolean Table API: BOOLEAN1 || BOOLEAN2 BOOLEAN.isFalse !BOOLEAN ⚫ 算術函式 SQL: numeric1 + numeric2POWER(numeric1, numeric2) Table API: NUMERIC1 + NUMERIC2 NUMERIC1.power(NUMERIC2) ⚫ 字串函式 SQL: string1 || string2 UPPER(string) CHAR_LENGTH(string) Table API: STRING1 + STRING2 STRING.upperCase() STRING.charLength() ⚫ 時間函式 SQL: DATE string TIMESTAMP string CURRENT_TIME INTERVAL string range Table API: STRING.toDate STRING.toTimestamp currentTime() NUMERIC.days NUMERIC.minutes ⚫ 聚合函式 SQL: COUNT(*) SUM([ ALL | DISTINCT ] expression) RANK()ROW_NUMBER() Table API: FIELD.count FIELD.sum0

2 UDF

  使用者定義函式(User-defined Functions,UDF)是一個重要的特性,因為它們顯著地擴充套件了查詢(Query)的表達能力。一些系統內建函式無法解決的需求,我們可以用 UDF 來自 定義實現。 2.1 註冊使用者自定義函式 UDF   在大多數情況下,使用者定義的函式必須先註冊,然後才能在查詢中使用。不需要專門為Scala 的 Table API 註冊函式。   函式通過呼叫 registerFunction()方法在 TableEnvironment 中註冊。當用戶定義的函式被註冊時,它被插入到 TableEnvironment 的函式目錄中,這樣 Table API 或 SQL 解析器就可 以識別並正確地解釋它。 2.2 標量函式(Scalar Functions)   使用者定義的標量函式,可以將 0、1 或多個標量值,對映到新的標量值。   為了定義標量函式,必須在 org.apache.flink.table.functions 中擴充套件基類 Scalar Function,並實現(一個或多個)求值(evaluation,eval)方法。標量函式的行為由求值方法決定, 求值方法必須公開宣告並命名為 eval(直接 def 宣告,沒有 override)。求值方法的引數型別和返回型別,確定了標量函式的引數和返回型別。   在下面的程式碼中,我們定義自己的 HashCode 函式,在 TableEnvironment 中註冊它,並在查詢中呼叫它。
//
自定義一個標量函式 class HashCode( factor: Int ) extends ScalarFunction { def eval( s: String ): Int = { s.hashCode * factor } }
  主函式中呼叫,計算 sensor id 的雜湊值(前面部分照抄,流環境、表環境、讀取 source、建表):
def main(args: Array[String]): Unit = {
 val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setParallelism(
1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings .newInstance() .useOldPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create( env, settings ) // 定義好 DataStream val inputStream: DataStream[String] = env.readTextFile("..\\sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 將 DataStream 轉換為 Table,並指定時間欄位 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature) // Table API 中使用 val hashCode = new HashCode(10) val resultTable = sensorTable .select( 'id, hashCode('id) ) // SQL 中使用 tableEnv.createTemporaryView("sensor", sensorTable) tableEnv.registerFunction("hashCode", hashCode) val resultSqlTable = tableEnv.sqlQuery("select id, hashCode(id) from sensor") // 轉換成流,列印輸出 resultTable.toAppendStream[Row].print("table") resultSqlTable.toAppendStream[Row].print("sql") env.execute() }
2.3 表函式(Table Functions)   與使用者定義的標量函式類似,使用者定義的表函式,可以將 0、1 或多個標量值作為輸入引數;與標量函式不同的是,它可以返回任意數量的行作為輸出,而不是單個值。   為了定義一個表函式,必須擴充套件 org.apache.flink.table.functions 中的基類 TableFunction並實現(一個或多個)求值方法。表函式的行為由其求值方法決定,求值方法必須是 public 的,並命名為 eval。求值方法的引數型別,決定表函式的所有有效引數。   返回表的型別由 TableFunction 的泛型型別確定。求值方法使用 protected collect(T)方法發出輸出行。在 Table API 中,Table 函式需要與.joinLateral 或.leftOuterJoinLateral 一起使用。   joinLateral 運算元,會將外部表中的每一行,與表函式(TableFunction,運算元的引數是它的表示式)計算得到的所有行連線起來。   而 leftOuterJoinLateral 運算元,則是左外連線,它同樣會將外部表中的每一行與表函式計算生成的所有行連線起來;並且,對於表函式返回的是空表的外部行,也要保留下來。 在 SQL 中,則需要使用 Lateral Table(<TableFunction>),或者帶有 ON TRUE 條件的左連線。   下面的程式碼中,我們將定義一個表函式,在表環境中註冊它,並在查詢中呼叫它。   自定義 TableFunction:
// 自定義 TableFunction
class Split(separator: String) extends TableFunction[(String, Int)]{
 def eval(str: String): Unit = {
 str.split(separator).foreach(
 word => collect((word, word.length))
 )
 } }
接下來,就是在程式碼中呼叫。首先是 Table API 的方式:
// Table API 中呼叫,需要用 joinLateral
 val resultTable = sensorTable
 .joinLateral(split('id) as ('word, 'length)) // as 對輸出行的欄位重新命名
 .select('id, 'word, 'length)
 
// 或者用 leftOuterJoinLateral
 val resultTable2 = sensorTable
 .leftOuterJoinLateral(split('id) as ('word, 'length))
 .select('id, 'word, 'length)
 
 // 轉換成流列印輸出
 resultTable.toAppendStream[Row].print("1")
 resultTable2.toAppendStream[Row].print("2")
然後是 SQL 的方式:
tableEnv.createTemporaryView("sensor", sensorTable)
 tableEnv.registerFunction("split", split)
 
 val resultSqlTable = tableEnv.sqlQuery(
 """
 |select id, word, length
 |from
 |sensor, LATERAL TABLE(split(id)) AS newsensor(word, length)
 """.stripMargin)
 // 或者用左連線的方式
 val resultSqlTable2 = tableEnv.sqlQuery(
 """
 |SELECT id, word, length
 |FROM
 |sensor
| LEFT JOIN
| LATERAL TABLE(split(id)) AS newsensor(word, length) 
| ON TRUE
 """.stripMargin
 )
 // 轉換成流列印輸出
 resultSqlTable.toAppendStream[Row].print("1")
 resultSqlTable2.toAppendStream[Row].print("2")
2.4 聚合函式(Aggregate Functions)   使用者自定義聚合函式(User-Defined Aggregate Functions,UDAGGs)可以把一個表中的資料,聚合成一個標量值。使用者定義的聚合函式,是通過繼承 AggregateFunction 抽象類實 現的。

  上圖中顯示了一個聚合的例子。   假設現在有一張表,包含了各種飲料的資料。該表由三列(id、name 和 price)、五行組成資料。現在我們需要找到表中所有飲料的最高價格,即執行 max()聚合,結果將是一 個數值。   AggregateFunction 的工作原理如下。   ⚫ 首先,它需要一個累加器,用來儲存聚合中間結果的資料結構(狀態)。可以通過     呼叫 AggregateFunction 的 createAccumulator()方法建立空累加器。   ⚫ 隨後,對每個輸入行呼叫函式的 accumulate()方法來更新累加器。   ⚫ 處理完所有行後,將呼叫函式的 getValue()方法來計算並返回最終結果。     AggregationFunction 要求必須實現的方法:   ⚫ createAccumulator()   ⚫ accumulate()   ⚫ getValue()   除了上述方法之外,還有一些可選擇實現的方法。其中一些方法,可以讓系統執行查詢更有效率,而另一些方法,對於某些場景是必需的。例如,如果聚合函式應用在會話視窗(session group window)的上下文中,則 merge()方法是必需的。   ⚫ retract()   ⚫ merge()   ⚫ resetAccumulator() 接下來我們寫一個自定義 AggregateFunction,計算一下每個 sensor 的平均溫度值。
// 定義 AggregateFunction 的 Accumulator
class AvgTempAcc {
 var sum: Double = 0.0
 var count: Int = 0 }
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
 override def getValue(accumulator: AvgTempAcc): Double =
 accumulator.sum / accumulator.count
 override def createAccumulator(): AvgTempAcc = new AvgTempAcc
 def accumulate(accumulator: AvgTempAcc, temp: Double): Unit ={
 accumulator.sum += temp
 accumulator.count += 1
 } }
接下來就可以在程式碼中呼叫了。
// 建立一個聚合函式例項
val avgTemp = new AvgTemp()
// Table API 的呼叫
val resultTable = sensorTable.groupBy('id)
 .aggregate(avgTemp('temperature) as 'avgTemp)
 .select('id, 'avgTemp)
// SQL 的實現
tableEnv.createTemporaryView("sensor", sensorTable)
tableEnv.registerFunction("avgTemp", avgTemp)
val resultSqlTable = tableEnv.sqlQuery(
 """
 |SELECT
 |id, avgTemp(temperature)
 |FROM
 |sensor
 |GROUP BY id
 """.stripMargin)
// 轉換成流列印輸出
resultTable.toRetractStream[(String, Double)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")
2.5 表聚合函式(Table Aggregate Functions)   使用者定義的表聚合函式(User-Defined Table Aggregate Functions,UDTAGGs),可以把一個表中資料,聚合為具有多行和多列的結果表。這跟 AggregateFunction 非常類似,只是之 前聚合結果是一個標量值,現在變成了一張表。   比如現在我們需要找到表中所有飲料的前 2 個最高價格,即執行 top2()表聚合。我們需要檢查 5 行中的每一行,得到的結果將是一個具有排序後前 2 個值的表。   使用者定義的表聚合函式,是通過繼承 TableAggregateFunction 抽象類來實現的。   TableAggregateFunction 的工作原理如下。   ⚫ 首先,它同樣需要一個累加器(Accumulator),它是儲存聚合中間結果的資料結構。     通過呼叫 TableAggregateFunction 的 createAccumulator()方法可以建立空累加器。   ⚫ 隨後,對每個輸入行呼叫函式的 accumulate()方法來更新累加器。   ⚫ 處理完所有行後,將呼叫函式的 emitValue()方法來計算並返回最終結果。   AggregationFunction 要求必須實現的方法:   ⚫ createAccumulator()   ⚫ accumulate()   除了上述方法之外,還有一些可選擇實現的方法。   ⚫retract()   ⚫merge()   ⚫resetAccumulator()   ⚫emitValue()   ⚫emitUpdateWithRetract()   接下來我們寫一個自定義 TableAggregateFunction,用來提取每個 sensor 最高的兩個溫度值。
// 先定義一個 Accumulator
class Top2TempAcc{
 var highestTemp: Double = Int.MinValue
 var secondHighestTemp: Double = Int.MinValue
}
// 自定義 TableAggregateFunction
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc]{
 
 override def createAccumulator(): Top2TempAcc = new Top2TempAcc
 def accumulate(acc: Top2TempAcc, temp: Double): Unit ={
 if( temp > acc.highestTemp ){
 acc.secondHighestTemp = acc.highestTemp
 acc.highestTemp = temp
 } else if( temp > acc.secondHighestTemp ){
 acc.secondHighestTemp = temp
 }
 }
 def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit ={
 out.collect(acc.highestTemp, 1)
 out.collect(acc.secondHighestTemp, 2)
 } }
接下來就可以在程式碼中呼叫了。
// 建立一個表聚合函式例項
val top2Temp = new Top2Temp()
// Table API 的呼叫
val resultTable = sensorTable.groupBy('id)
 .flatAggregate( top2Temp('temperature) as ('temp, 'rank) )
 .select('id, 'temp, 'rank)
// 轉換成流列印輸出
resultTable.toRetractStream[(String, Double, Int)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")