Flink開發_Flink的SQL和TableAPI的UDF
阿新 • • 發佈:2020-12-05
Flink Table API & SQL
關係型資料庫中:database.schema.table
其他
分散式資料庫中:catalog.database.table
這裡的自定義函式,主要指在 Flink Table API & SQL 這個層級的自定義函式,注意和Datastream有所區別
1.函式區分
1.從兩個角度來區分函式 從函式的擁有來講: 系統內建函式 和自定義函式 system (or built-in) functions v.s. catalog functions 從生命週期來講: 分為臨時函式和永久函式 temporary functions v.s. persistent functions 2.因此組合的方式: Temporary system functions System functions Temporary catalog functions Catalog functions
2.如何使用函式:
精確引用 1.10版本及以上使用
模糊引用: 在SQL語句中使用,針對同名的情況,引用的順序是:
Temporary system function
System function
Temporary catalog function, in the current catalog and current database of the session
Catalog function, in the current catalog and current database of the session
3.系統內建函式
Scalar Functions 標量型函式 算數函式 代數函式(= != > isnull in exists between) 邏輯函式(and or ) 字元函式 時間函式 條件函式 case when COALESCE IS_DIGIT 函式型別轉換 CAST Collection函式: ARRAY Map 分組函式 Hash值函式 Aggregate Functions 聚合型函式 max min sum count COLLECT ROW_NUMBER DENSE_RANK RANK() ROW_NUMBER 1 2 3 4 dense_rank 函式在生成序號時是連續的,1 2 2 3 dense稠密 rank 函式生成的序號有可能不連續。 1 2 2 4 Column Functions 列函式 -Column functions are only used in Table API. withColumns withoutColumns
Flink的UDF
擴充套件了查詢的表達能力,同時可以把這種表達能力開放出去
基於JVM語言的UDF: Java Scala
自定義函式型別
Scalar functions map scalar values to a new scalar value. Table functions map scalar values to new rows. Aggregate functions map scalar values of multiple rows to a new scalar value. Table aggregate functions map scalar values of multiple rows to new rows. Async table functions are special functions for table sources that perform a lookup. 從與Hive比較角度: UDF UDTF UDAF Flink自身又有說細分和增加 版本的不同: UDF UDTF: org.apache.flink.table.types.DataType UDAF : org.apache.flink.api.common.typeinfo.TypeInformation aggregate 這部分正在重構,目前是使用TypeInformation,重構後使用DataType (注意: Flink設計型別資訊的有 TypeInformation org.apache.flink.api.common.typeinfo.Types org.apache.flink.api.common.typeinfo.TypeInformation Type org.apache.flink.table.api.Types DataType org.apache.flink.table.types.DataType 1.9版本以後移除了對 TypeInformation 的依賴 ) * @see ScalarFunction org.apache.flink.table.functions. * @see TableFunction org.apache.flink.table.functions. * @see AggregateFunction * @see TableAggregateFunction * @see AsyncTableFunction
如何編寫
如何呼叫: both Table API and SQL.
For SQL queries , a function must always be registered under a name.
For Table API , a function can be registered or directly used inline
示例:
0.編寫UDF
// define function logic
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
###使用UDF
1.對於SQL來講,需要註冊,然後在SQL中使用
// register function
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// call registered function in SQL
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
2.對於TableAPI來說,可以直接用,或者註冊後再在Table API中使用
// call function "inline" without registration in Table API
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
// register function
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// call registered function in Table API
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
具體說明
Udf提供了open()和close()方法,可以被複寫,功能類似Dataset和DataStream API的RichFunction方法
1.UDF繼承 ScalarFunction 抽象類,主要實現 eval 方法。
輸出一行
org.apache.flink.table.functions
public abstract class ScalarFunction extends UserDefinedFunction {}
注意:返回值型別:
基本的返回值型別 和自定義複雜的返回值型別
複雜的可能要實現方法: getResultType()
2.UDF繼承 TableFunction 抽象類,主要實現 eval 方法。
輸出任意數目的行數。返回的行也可以包含一個或者多個列,通過提供 provide a collect(T) method
org.apache.flink.table.functions
public abstract class TableFunction<T> extends UserDefinedFunction {}
3.Aggregation Functions
The following methods are mandatory for each AggregateFunction:
createAccumulator()
accumulate()
getValue()
Spark SQL的UDAF UserDefinedAggregateFunction
Flink: org.apache.flink.table.functions.AggregateFunction
public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
<IN, ACC, OUT>
必不可少的: createAccumulator() accumulate() getValue()
The following methods of AggregateFunction are required depending on the use case
merge()方法在會話組視窗(session group window)上下文中是必須的
retract()
resetAccumulator()
Spark中 org.apache.spark.sql.expressions
public abstract class UserDefinedAggregateFunction extends Serializable{}
inputSchema bufferSchema dataType
initialize update merge evaluate
Hive: org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
1)需繼承AbstractGenericUDAFResolver抽象類,重寫方法getEvaluator(TypeInfo[] parameters);
2)內部靜態類需繼承GenericUDAFEvaluator抽象類,重寫方法 init()
實現方法 getNewAggregationBuffer() reset() iterate() terminatePartial() merge() terminate()
4.Table Aggregation Functions
TableAggregateFunction
createAccumulator()
accumulate()
The following methods of TableAggregateFunction are required depending on the use case:
retract() is required for aggregations on bounded OVER windows.
merge() is required for many batch aggregations and session window aggregations.
resetAccumulator() is required for many batch aggregations.
emitValue() is required for batch and window aggregations.
emitUpdateWithRetract
參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html