1. 程式人生 > 資料庫 >Flink開發_Flink的SQL和TableAPI的UDF

Flink開發_Flink的SQL和TableAPI的UDF

 關係型資料庫中:database.schema.table
               其他
 分散式資料庫中:catalog.database.table 
   這裡的自定義函式,主要指在 Flink Table API & SQL 這個層級的自定義函式,注意和Datastream有所區別

1.函式區分

1.從兩個角度來區分函式
    從函式的擁有來講: 系統內建函式 和自定義函式 system (or built-in) functions v.s. catalog functions
	從生命週期來講:   分為臨時函式和永久函式    temporary functions           v.s.  persistent functions
2.因此組合的方式:
    Temporary system functions
    System functions
    Temporary catalog functions
    Catalog functions

2.如何使用函式:

精確引用   1.10版本及以上使用
模糊引用: 在SQL語句中使用,針對同名的情況,引用的順序是:
   Temporary system function
   System function
   Temporary catalog function, in the current catalog and current database of the session
   Catalog function, in the current catalog and current database of the session

3.系統內建函式

Scalar Functions 標量型函式
    算數函式  代數函式(= != >  isnull in exists between)  邏輯函式(and or )
	字元函式  時間函式
	條件函式 case when   COALESCE  IS_DIGIT
	函式型別轉換  CAST
	Collection函式: ARRAY  Map 
	分組函式 Hash值函式
Aggregate Functions 聚合型函式
   max min sum count 
   COLLECT
   ROW_NUMBER   DENSE_RANK RANK()
      ROW_NUMBER 1 2 3 4
      dense_rank 函式在生成序號時是連續的,1 2  2 3    dense稠密
      rank      函式生成的序號有可能不連續。 1 2 2 4
Column Functions 列函式 -Column functions are only used in Table API.
   withColumns
   withoutColumns

Flink的UDF

 擴充套件了查詢的表達能力,同時可以把這種表達能力開放出去
  基於JVM語言的UDF: Java Scala

自定義函式型別

        Scalar functions      map scalar values                  to a new scalar value.
         Table functions      map scalar values                  to new rows.
     Aggregate functions      map scalar values of multiple rows to a new scalar value.
    Table aggregate functions map scalar values of multiple rows to new rows.
     Async table functions are special functions for table sources that perform a lookup.
	 從與Hive比較角度: UDF UDTF UDAF
	 Flink自身又有說細分和增加
 版本的不同:
      UDF UDTF: org.apache.flink.table.types.DataType        
	  UDAF    : org.apache.flink.api.common.typeinfo.TypeInformation 
	            aggregate 這部分正在重構,目前是使用TypeInformation,重構後使用DataType
(注意: Flink設計型別資訊的有
  TypeInformation  org.apache.flink.api.common.typeinfo.Types
                   org.apache.flink.api.common.typeinfo.TypeInformation
  Type             org.apache.flink.table.api.Types
  DataType         org.apache.flink.table.types.DataType  1.9版本以後移除了對 TypeInformation 的依賴
  )
    * @see ScalarFunction      org.apache.flink.table.functions.
    * @see TableFunction       org.apache.flink.table.functions.
    * @see AggregateFunction
    * @see TableAggregateFunction
	* @see AsyncTableFunction

 如何編寫
 如何呼叫: both Table API and SQL.
   For SQL queries , a function must always be registered under a name. 
   For Table API   , a function can be registered or directly used inline
示例:
0.編寫UDF
   // define function logic
   public static class SubstringFunction extends ScalarFunction {
     public String eval(String s, Integer begin, Integer end) {
       return s.substring(begin, end);
     }
   }
###使用UDF
1.對於SQL來講,需要註冊,然後在SQL中使用
  // register function
  env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
  // call registered function in SQL
  env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");

2.對於TableAPI來說,可以直接用,或者註冊後再在Table API中使用
  // call function "inline" without registration in Table API
  env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
  
  // register function
  env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
  // call registered function in Table API
  env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));

具體說明

   Udf提供了open()和close()方法,可以被複寫,功能類似Dataset和DataStream API的RichFunction方法
 1.UDF繼承 ScalarFunction 抽象類,主要實現 eval 方法。
   輸出一行
   org.apache.flink.table.functions
     public abstract class ScalarFunction extends UserDefinedFunction {}
	 注意:返回值型別: 
	     基本的返回值型別 和自定義複雜的返回值型別
		  複雜的可能要實現方法: getResultType()
 2.UDF繼承 TableFunction 抽象類,主要實現 eval 方法。
    輸出任意數目的行數。返回的行也可以包含一個或者多個列,通過提供 provide a collect(T) method
    org.apache.flink.table.functions  
	 public abstract class TableFunction<T> extends UserDefinedFunction {}
3.Aggregation Functions
   The following methods are mandatory for each AggregateFunction:
     createAccumulator()
     accumulate()
     getValue()
    Spark SQL的UDAF  UserDefinedAggregateFunction
 Flink: org.apache.flink.table.functions.AggregateFunction
	public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
	    <IN, ACC, OUT>
		必不可少的: createAccumulator() accumulate() getValue()
        The following methods of AggregateFunction are required depending on the use case
	       merge()方法在會話組視窗(session group window)上下文中是必須的
		   retract() 
		   resetAccumulator()
 Spark中 org.apache.spark.sql.expressions
   public abstract class UserDefinedAggregateFunction extends Serializable{}
    inputSchema  bufferSchema  dataType
    initialize  update  merge  evaluate
 Hive: org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
   1)需繼承AbstractGenericUDAFResolver抽象類,重寫方法getEvaluator(TypeInfo[] parameters);
   2)內部靜態類需繼承GenericUDAFEvaluator抽象類,重寫方法 init() 
      實現方法 getNewAggregationBuffer() reset() iterate() terminatePartial() merge() terminate() 

4.Table Aggregation Functions
  TableAggregateFunction
    createAccumulator()
    accumulate()
 The following methods of TableAggregateFunction are required depending on the use case:
     retract() is required for aggregations on bounded OVER windows.
     merge() is required for many batch aggregations and session window aggregations.
     resetAccumulator() is required for many batch aggregations.
     emitValue() is required for batch and window aggregations.
  emitUpdateWithRetract

參考:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html