Flink基礎(十六):Table API 和 Flink SQL(五)函式(Functions)
阿新 • • 發佈:2020-08-04
1 系統內建函式
Flink Table API 和 SQL 為使用者提供了一組用於資料轉換的內建函式。SQL 中支援的很多函式,Table API 和 SQL 都已經做了實現,其它還在快速開發擴充套件中。 以下是一些典型函式的舉例,全部的內建函式,可以參考官網介紹。 ⚫ 比較函式 SQL: value1 = value2 value1 > value2 Table API: ANY1 === ANY2 ANY1 > ANY2 ⚫ 邏輯函式 SQL: boolean1 OR boolean2 boolean IS FALSE NOT boolean Table API: BOOLEAN1 || BOOLEAN2 BOOLEAN.isFalse !BOOLEAN ⚫ 算術函式 SQL: numeric1 + numeric2POWER(numeric1, numeric2) Table API: NUMERIC1 + NUMERIC2 NUMERIC1.power(NUMERIC2) ⚫ 字串函式 SQL: string1 || string2 UPPER(string) CHAR_LENGTH(string) Table API: STRING1 + STRING2 STRING.upperCase() STRING.charLength() ⚫ 時間函式 SQL: DATE string TIMESTAMP string CURRENT_TIME INTERVAL string range Table API: STRING.toDate STRING.toTimestamp currentTime() NUMERIC.days NUMERIC.minutes ⚫ 聚合函式 SQL: COUNT(*) SUM([ ALL | DISTINCT ] expression) RANK()ROW_NUMBER() Table API: FIELD.count FIELD.sum02 UDF
//主函式中呼叫,計算 sensor id 的雜湊值(前面部分照抄,流環境、表環境、讀取 source、建表):自定義一個標量函式 class HashCode( factor: Int ) extends ScalarFunction { def eval( s: String ): Int = { s.hashCode * factor } }
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(2.3 表函式(Table Functions) 與使用者定義的標量函式類似,使用者定義的表函式,可以將 0、1 或多個標量值作為輸入引數;與標量函式不同的是,它可以返回任意數量的行作為輸出,而不是單個值。 為了定義一個表函式,必須擴充套件 org.apache.flink.table.functions 中的基類 TableFunction並實現(一個或多個)求值方法。表函式的行為由其求值方法決定,求值方法必須是 public 的,並命名為 eval。求值方法的引數型別,決定表函式的所有有效引數。 返回表的型別由 TableFunction 的泛型型別確定。求值方法使用 protected collect(T)方法發出輸出行。在 Table API 中,Table 函式需要與.joinLateral 或.leftOuterJoinLateral 一起使用。 joinLateral 運算元,會將外部表中的每一行,與表函式(TableFunction,運算元的引數是它的表示式)計算得到的所有行連線起來。 而 leftOuterJoinLateral 運算元,則是左外連線,它同樣會將外部表中的每一行與表函式計算生成的所有行連線起來;並且,對於表函式返回的是空表的外部行,也要保留下來。 在 SQL 中,則需要使用 Lateral Table(<TableFunction>),或者帶有 ON TRUE 條件的左連線。 下面的程式碼中,我們將定義一個表函式,在表環境中註冊它,並在查詢中呼叫它。 自定義 TableFunction:1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings .newInstance() .useOldPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create( env, settings ) // 定義好 DataStream val inputStream: DataStream[String] = env.readTextFile("..\\sensor.txt") val dataStream: DataStream[SensorReading] = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 將 DataStream 轉換為 Table,並指定時間欄位 val sensorTable = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.rowtime, 'temperature) // Table API 中使用 val hashCode = new HashCode(10) val resultTable = sensorTable .select( 'id, hashCode('id) ) // SQL 中使用 tableEnv.createTemporaryView("sensor", sensorTable) tableEnv.registerFunction("hashCode", hashCode) val resultSqlTable = tableEnv.sqlQuery("select id, hashCode(id) from sensor") // 轉換成流,列印輸出 resultTable.toAppendStream[Row].print("table") resultSqlTable.toAppendStream[Row].print("sql") env.execute() }
// 自定義 TableFunction class Split(separator: String) extends TableFunction[(String, Int)]{ def eval(str: String): Unit = { str.split(separator).foreach( word => collect((word, word.length)) ) } }接下來,就是在程式碼中呼叫。首先是 Table API 的方式:
// Table API 中呼叫,需要用 joinLateral val resultTable = sensorTable .joinLateral(split('id) as ('word, 'length)) // as 對輸出行的欄位重新命名 .select('id, 'word, 'length) // 或者用 leftOuterJoinLateral val resultTable2 = sensorTable .leftOuterJoinLateral(split('id) as ('word, 'length)) .select('id, 'word, 'length) // 轉換成流列印輸出 resultTable.toAppendStream[Row].print("1") resultTable2.toAppendStream[Row].print("2")然後是 SQL 的方式:
tableEnv.createTemporaryView("sensor", sensorTable) tableEnv.registerFunction("split", split) val resultSqlTable = tableEnv.sqlQuery( """ |select id, word, length |from |sensor, LATERAL TABLE(split(id)) AS newsensor(word, length) """.stripMargin) // 或者用左連線的方式 val resultSqlTable2 = tableEnv.sqlQuery( """ |SELECT id, word, length |FROM |sensor | LEFT JOIN | LATERAL TABLE(split(id)) AS newsensor(word, length) | ON TRUE """.stripMargin ) // 轉換成流列印輸出 resultSqlTable.toAppendStream[Row].print("1") resultSqlTable2.toAppendStream[Row].print("2")2.4 聚合函式(Aggregate Functions) 使用者自定義聚合函式(User-Defined Aggregate Functions,UDAGGs)可以把一個表中的資料,聚合成一個標量值。使用者定義的聚合函式,是通過繼承 AggregateFunction 抽象類實 現的。 上圖中顯示了一個聚合的例子。 假設現在有一張表,包含了各種飲料的資料。該表由三列(id、name 和 price)、五行組成資料。現在我們需要找到表中所有飲料的最高價格,即執行 max()聚合,結果將是一 個數值。 AggregateFunction 的工作原理如下。 ⚫ 首先,它需要一個累加器,用來儲存聚合中間結果的資料結構(狀態)。可以通過 呼叫 AggregateFunction 的 createAccumulator()方法建立空累加器。 ⚫ 隨後,對每個輸入行呼叫函式的 accumulate()方法來更新累加器。 ⚫ 處理完所有行後,將呼叫函式的 getValue()方法來計算並返回最終結果。 AggregationFunction 要求必須實現的方法: ⚫ createAccumulator() ⚫ accumulate() ⚫ getValue() 除了上述方法之外,還有一些可選擇實現的方法。其中一些方法,可以讓系統執行查詢更有效率,而另一些方法,對於某些場景是必需的。例如,如果聚合函式應用在會話視窗(session group window)的上下文中,則 merge()方法是必需的。 ⚫ retract() ⚫ merge() ⚫ resetAccumulator() 接下來我們寫一個自定義 AggregateFunction,計算一下每個 sensor 的平均溫度值。
// 定義 AggregateFunction 的 Accumulator class AvgTempAcc { var sum: Double = 0.0 var count: Int = 0 } class AvgTemp extends AggregateFunction[Double, AvgTempAcc] { override def getValue(accumulator: AvgTempAcc): Double = accumulator.sum / accumulator.count override def createAccumulator(): AvgTempAcc = new AvgTempAcc def accumulate(accumulator: AvgTempAcc, temp: Double): Unit ={ accumulator.sum += temp accumulator.count += 1 } }接下來就可以在程式碼中呼叫了。
// 建立一個聚合函式例項 val avgTemp = new AvgTemp() // Table API 的呼叫 val resultTable = sensorTable.groupBy('id) .aggregate(avgTemp('temperature) as 'avgTemp) .select('id, 'avgTemp) // SQL 的實現 tableEnv.createTemporaryView("sensor", sensorTable) tableEnv.registerFunction("avgTemp", avgTemp) val resultSqlTable = tableEnv.sqlQuery( """ |SELECT |id, avgTemp(temperature) |FROM |sensor |GROUP BY id """.stripMargin) // 轉換成流列印輸出 resultTable.toRetractStream[(String, Double)].print("agg temp") resultSqlTable.toRetractStream[Row].print("agg temp sql")2.5 表聚合函式(Table Aggregate Functions) 使用者定義的表聚合函式(User-Defined Table Aggregate Functions,UDTAGGs),可以把一個表中資料,聚合為具有多行和多列的結果表。這跟 AggregateFunction 非常類似,只是之 前聚合結果是一個標量值,現在變成了一張表。 比如現在我們需要找到表中所有飲料的前 2 個最高價格,即執行 top2()表聚合。我們需要檢查 5 行中的每一行,得到的結果將是一個具有排序後前 2 個值的表。 使用者定義的表聚合函式,是通過繼承 TableAggregateFunction 抽象類來實現的。 TableAggregateFunction 的工作原理如下。 ⚫ 首先,它同樣需要一個累加器(Accumulator),它是儲存聚合中間結果的資料結構。 通過呼叫 TableAggregateFunction 的 createAccumulator()方法可以建立空累加器。 ⚫ 隨後,對每個輸入行呼叫函式的 accumulate()方法來更新累加器。 ⚫ 處理完所有行後,將呼叫函式的 emitValue()方法來計算並返回最終結果。 AggregationFunction 要求必須實現的方法: ⚫ createAccumulator() ⚫ accumulate() 除了上述方法之外,還有一些可選擇實現的方法。 ⚫retract() ⚫merge() ⚫resetAccumulator() ⚫emitValue() ⚫emitUpdateWithRetract() 接下來我們寫一個自定義 TableAggregateFunction,用來提取每個 sensor 最高的兩個溫度值。
// 先定義一個 Accumulator class Top2TempAcc{ var highestTemp: Double = Int.MinValue var secondHighestTemp: Double = Int.MinValue } // 自定義 TableAggregateFunction class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc]{ override def createAccumulator(): Top2TempAcc = new Top2TempAcc def accumulate(acc: Top2TempAcc, temp: Double): Unit ={ if( temp > acc.highestTemp ){ acc.secondHighestTemp = acc.highestTemp acc.highestTemp = temp } else if( temp > acc.secondHighestTemp ){ acc.secondHighestTemp = temp } } def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit ={ out.collect(acc.highestTemp, 1) out.collect(acc.secondHighestTemp, 2) } }接下來就可以在程式碼中呼叫了。
// 建立一個表聚合函式例項 val top2Temp = new Top2Temp() // Table API 的呼叫 val resultTable = sensorTable.groupBy('id) .flatAggregate( top2Temp('temperature) as ('temp, 'rank) ) .select('id, 'temp, 'rank) // 轉換成流列印輸出 resultTable.toRetractStream[(String, Double, Int)].print("agg temp") resultSqlTable.toRetractStream[Row].print("agg temp sql")