1. 程式人生 > 實用技巧 >Table API 和 Flink SQL—第五章 函式(Functions)

Table API 和 Flink SQL—第五章 函式(Functions)

Flink Table 和 SQL 內建了很多 SQL 中支援的函式;如果有無法滿足的需要,則可以實現使用者自定義的函式(UDF)來解決。

5.1 系統內建函式

Flink Table API 和 SQL 為使用者提供了一組用於資料轉換的內建函式。SQL 中支援的很多函式,Table API 和 SQL 都已經做了實現,其它還在快速開發擴充套件中。

以下是一些典型函式的舉例,全部的內建函式,可以參考官網介紹。

⚫	比較函式

SQL:

value1 = value2

value1 > value2

Table API:

ANY1 === ANY2

ANY1 > ANY2

⚫	邏輯函式

SQL:

boolean1 OR boolean2

boolean IS FALSE

NOT boolean

Table API:

BOOLEAN1 || BOOLEAN2

BOOLEAN.isFalse

!BOOLEAN

⚫	算術函式

SQL:

numeric1 + numeric2
 
POWER(numeric1, numeric2)

Table API:

NUMERIC1 + NUMERIC2

NUMERIC1.power(NUMERIC2)

⚫	字串函式

SQL:

string1 || string2

UPPER(string)

CHAR_LENGTH(string)

Table API:

STRING1 + STRING2

STRING.upperCase()

STRING.charLength()

⚫	時間函式

SQL:

DATE string

TIMESTAMP string

CURRENT_TIME

INTERVAL string range

Table API:

STRING.toDate

STRING.toTimestamp

currentTime()

NUMERIC.days

NUMERIC.minutes

⚫	聚合函式

SQL:

COUNT(*)

SUM([ ALL | DISTINCT ] expression)

RANK()
 
ROW_NUMBER()

Table API:

FIELD.count

FIELD.sum0	

5.1 UDF

使用者定義函式(User-defined Functions,UDF)是一個重要的特性,因為它們顯著地擴充套件了查詢(Query)的表達能力。一些系統內建函式無法解決的需求,我們可以用 UDF 來自定義實現。

5.1.1 註冊使用者自定義函式 UDF

在大多數情況下,使用者定義的函式必須先註冊,然後才能在查詢中使用。不需要專門為

Scala 的 Table API 註冊函式。

函式通過呼叫registerFunction()方法在 TableEnvironment 中註冊。當用戶定義的函式被註冊時,它被插入到 TableEnvironment 的函式目錄中,這樣 Table API 或 SQL 解析器就可以識別並正確地解釋它。

5.1.2 標量函式(Scalar Functions)

使用者定義的標量函式,可以將 0、1 或多個標量值,對映到新的標量值。

為了定義標量函式,必須在 org.apache.flink.table.functions 中擴充套件基類 Scalar Function, 並實現(一個或多個)求值(evaluation,eval)方法。標量函式的行為由求值方法決定, 求值方法必須公開宣告並命名為 eval(直接 def 宣告,沒有 override)。求值方法的引數型別和返回型別,確定了標量函式的引數和返回型別。

在下面的程式碼中,我們定義自己的 HashCode 函式,在 TableEnvironment 中註冊它,並在查詢中呼叫它。

// 自定義一個標量函式 

class HashCode( factor: Int ) extends ScalarFunction {
  def eval( s: String ): Int = {

s.hashCode * factor

}

  

主函式中呼叫,計算 sensor id 的雜湊值(前面部分照抄,流環境、表環境、讀取 source、建表):

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings

.newInstance()

.useOldPlanner()

.inStreamingMode()

.build()

val tableEnv = StreamTableEnvironment.create( env, settings )
// 定義好 DataStream

val inputStream: DataStream[String] = env.readTextFile("..\\sensor.txt")

val dataStream: DataStream[SensorReading] = inputStream

.map(data => {

val dataArray = data.split(",")

SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)

})

.assign
  val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature)
 

  // Table API 中使用 
val resultTable = sensorTable

.select( 'id, hashCode('id) )
// SQL 中使用

tableEnv.createTemporaryView("sensor", sensorTable) tableEnv.registerFunction("hashCode", hashCode)
val resultSqlTable = tableEnv.sqlQuery("select id, hashCode(id) from sensor")
// 轉換成流,列印輸出 
  resultTable.toAppendStream[Row].print("table") resultSqlTable.toAppendStream[Row].print("sql")
env.execute()

  

5.2.3 表函式(Table Functions)

與使用者定義的標量函式類似,使用者定義的表函式,可以將 0、1 或多個標量值作為輸入引數;與標量函式不同的是,它可以返回任意數量的行作為輸出,而不是單個值。

為了定義一個表函式,必須擴充套件 org.apache.flink.table.functions 中的基類 TableFunction 並實現(一個或多個)求值方法。表函式的行為由其求值方法決定,求值方法必須是 public 的,並命名為 eval。求值方法的引數型別,決定表函式的所有有效引數。

返回表的型別由 TableFunction 的泛型型別確定。求值方法使用 protected collect(T)方法發出輸出行。

在 Table API 中,Table 函式需要與.joinLateral 或.leftOuterJoinLateral 一起使用。

joinLateral 運算元,會將外部表中的每一行,與表函式(TableFunction,運算元的引數是它的表示式)計算得到的所有行連線起來。

而 leftOuterJoinLateral 運算元,則是左外連線,它同樣會將外部表中的每一行與表函式計算生成的所有行連線起來;並且,對於表函式返回的是空表的外部行,也要保留下來。

在 SQL 中,則需要使用 Lateral Table(<TableFunction>),或者帶有 ON TRUE 條件的左連線。

下面的程式碼中,我們將定義一個表函式,在表環境中註冊它,並在查詢中呼叫它。自定義 TableFunction:

 

// 自定義 TableFunction

class Split(separator: String) extends TableFunction[(String, Int)]{

def eval(str: String): Unit = { str.split(separator).foreach( 
      word => collect((word, word.length))

)

}

} 

 

接下來,就是在程式碼中呼叫。首先是 Table API 的方式:

// Table API 中呼叫,需要用 joinLateral
 val resultTable = sensorTable
 .joinLateral(split('id) as ('word, 'length)) // as 對輸出行的欄位重新命名
 .select('id, 'word, 'length)
 
// 或者用 leftOuterJoinLateral
 val resultTable2 = sensorTable
 .leftOuterJoinLateral(split('id) as ('word, 'length))
 .select('id, 'word, 'length)
 
 // 轉換成流列印輸出
 resultTable.toAppendStream[Row].print("1")
 resultTable2.toAppendStream[Row].print("2")
然後是 SQL 的方式:
tableEnv.createTemporaryView("sensor", sensorTable)
 tableEnv.registerFunction("split", split)
 
 val resultSqlTable = tableEnv.sqlQuery(
 """
 |select id, word, length
 |from
 |sensor, LATERAL TABLE(split(id)) AS newsensor(word, length)
 """.stripMargin)
 // 或者用左連線的方式
 val resultSqlTable2 = tableEnv.sqlQuery(
 """
 |SELECT id, word, length
 |FROM
 |sensor
| LEFT JOIN
| LATERAL TABLE(split(id)) AS newsensor(word, length) 
| ON TRUE
 """.stripMargin
 )
 // 轉換成流列印輸出
 resultSqlTable.toAppendStream[Row].print("1")
 resultSqlTable2.toAppendStream[Row].print("2")

  

5.2.4 聚合函式(Aggregate Functions)

使用者自定義聚合函式(User-Defined Aggregate Functions,UDAGGs)可以把一個表中的資料,聚合成一個標量值。使用者定義的聚合函式,是通過繼承 AggregateFunction 抽象類實現的。

 

上圖中顯示了一個聚合的例子。 假設現在有一張表,包含了各種飲料的資料。該表由三列(id、name 和 price)、五行組成資料。現在我們需要找到表中所有飲料的最高價格,即執行 max()聚合,結果將是一個數值。 AggregateFunction 的工作原理如下。 ⚫ 首先,它需要一個累加器,用來儲存聚合中間結果的資料結構(狀態)。可以通過 呼叫 AggregateFunction 的 createAccumulator()方法建立空累加器。 ⚫ 隨後,對每個輸入行呼叫函式的 accumulate()方法來更新累加器。 ⚫ 處理完所有行後,將呼叫函式的 getValue()方法來計算並返回最終結果。 AggregationFunction 要求必須實現的方法: ⚫ createAccumulator()⚫ accumulate() ⚫ getValue() 除了上述方法之外,還有一些可選擇實現的方法。其中一些方法,可以讓系統執行查詢更有效率,而另一些方法,對於某些場景是必需的。例如,如果聚合函式應用在會話視窗(session group window)的上下文中,則 merge()方法是必需的。 ⚫ retract() ⚫ merge() ⚫ resetAccumulator() 接下來我們寫一個自定義 AggregateFunction,計算一下每個 sensor 的平均溫度值。
// 定義 AggregateFunction 的 Accumulator
class AvgTempAcc {
 var sum: Double = 0.0
 var count: Int = 0 }
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
 override def getValue(accumulator: AvgTempAcc): Double =
 accumulator.sum / accumulator.count
 override def createAccumulator(): AvgTempAcc = new AvgTempAcc
 def accumulate(accumulator: AvgTempAcc, temp: Double): Unit ={
 accumulator.sum += temp
 accumulator.count += 1
 } }
接下來就可以在程式碼中呼叫了。
// 建立一個聚合函式例項
val avgTemp = new AvgTemp()
// Table API 的呼叫
val resultTable = sensorTable.groupBy('id)
 .aggregate(avgTemp('temperature) as 'avgTemp)
 .select('id, 'avgTemp)
// SQL 的實現
tableEnv.createTemporaryView("sensor", sensorTable)
tableEnv.registerFunction("avgTemp", avgTemp)
val resultSqlTable = tableEnv.sqlQuery(
 """
 |SELECT
 |id, avgTemp(temperature)
 |FROM
 |sensor
 |GROUP BY id
 """.stripMargin)
// 轉換成流列印輸出
resultTable.toRetractStream[(String, Double)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")

5.2.5 表聚合函式(Table Aggregate Functions)

使用者定義的表聚合函式(User-Defined Table Aggregate Functions,UDTAGGs),可以把一 個表中資料,聚合為具有多行和多列的結果表。這跟 AggregateFunction 非常類似,只是之 前聚合結果是一個標量值,現在變成了一張表。

比如現在我們需要找到表中所有飲料的前 2 個最高價格,即執行 top2()表聚合。我 們需要檢查 5 行中的每一行,得到的結果將是一個具有排序後前 2 個值的表。 使用者定義的表聚合函式,是通過繼承 TableAggregateFunction 抽象類來實現的。 TableAggregateFunction 的工作原理如下。 首先,它同樣需要一個累加器(Accumulator),它是儲存聚合中間結果的資料結構。 通過呼叫 TableAggregateFunction 的 createAccumulator()方法可以建立空累加器。 隨後,對每個輸入行呼叫函式的 accumulate()方法來更新累加器。 處理完所有行後,將呼叫函式的 emitValue()方法來計算並返回最終結果。 AggregationFunction 要求必須實現的方法: createAccumulator() accumulate() 除了上述方法之外,還有一些可選擇實現的方法。 retract() merge() resetAccumulator() emitValue() emitUpdateWithRetract() 接下來我們寫一個自定義 TableAggregateFunction,用來提取每個 sensor 最高的兩個溫 度值。

// 先定義一個 Accumulator
class Top2TempAcc{
 var highestTemp: Double = Int.MinValue
 var secondHighestTemp: Double = Int.MinValue
}
// 自定義 TableAggregateFunction
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc]{
 
 override def createAccumulator(): Top2TempAcc = new Top2TempAcc
 def accumulate(acc: Top2TempAcc, temp: Double): Unit ={
 if( temp > acc.highestTemp ){
 acc.secondHighestTemp = acc.highestTemp
 acc.highestTemp = temp
 } else if( temp > acc.secondHighestTemp ){
 acc.secondHighestTemp = temp
 }
 }
 def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit ={
 out.collect(acc.highestTemp, 1)
 out.collect(acc.secondHighestTemp, 2)
 } }
接下來就可以在程式碼中呼叫了。
// 建立一個表聚合函式例項
val top2Temp = new Top2Temp()
// Table API 的呼叫
val resultTable = sensorTable.groupBy('id)
 .flatAggregate( top2Temp('temperature) as ('temp, 'rank) )
 .select('id, 'temp, 'rank)
// 轉換成流列印輸出
resultTable.toRetractStream[(String, Double, Int)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")