Flink基礎(68):FLINK SQL(45) 自定義函式(四)自定義表值函式(UDTF)
阿新 • • 發佈:2021-08-07
本文為您介紹如何為實時計算Flink版自定義表值函式(UDTF)搭建開發環境、編寫業務程式碼以及上線。
說明阿里雲實時計算Flink版共享模式暫不支援自定義函式,僅獨享模式支援自定義函式。定義
與自定義的標量函式類似,自定義的表值函式(UDTF)將0個、1個或多個標量值作為輸入引數(可以是變長引數)。與標量函式不同,表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。
搭建開發環境
參見環境搭建。
編寫業務邏輯程式碼
UDTF需要在TableFunction類中實現eval方法。open方法和close方法可選。以Java為例,示例程式碼如下packagecom.hjc.test.blink.sql.udx; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction; public class SplitUdtf extends TableFunction<String> { // 可選,open方法可不編寫。如果編寫,則需要新增宣告'import org.apache.flink.table.functions.FunctionContext;'。 @Overridepublic void open(FunctionContext context) { // ... ... } public void eval(String str) { String[] split = str.split("\\|"); for (String s : split) { collect(s); } } // 可選,close方法可不編寫。 @Override public void close() { // ... ...} }
多行返回
UDTF可以通過多次呼叫collect()
實現將1行的資料轉為多行返回。
多列返回
UDTF不僅可以進行1行轉多行,還可以1列轉多列。如果您需要UDTF返回多列,只需要將返回值宣告成Tuple或Row。Tuple或Row解釋如下:- 返回值為Tuple 實時計算Flink版支援使用Tuple1到Tuple25 ,定義1個欄位到25個欄位。用Tuple3來返回3個欄位的UDTF示例如下。
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.table.functions.TableFunction; // 使用Tuple作為返回值,一定要顯式宣告Tuple的泛型型別, 例如,String、Long和Integer。 public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> { public void eval(String str) { String[] split = str.split(","); // 以下程式碼僅作示例,實際業務需要新增更多的校驗邏輯。 String first = split[0]; long second = Long.parseLong(split[1]); int third = Integer.parseInt(split[2]); Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third); collect(tuple3); } }
- 說明使用Tuple時,欄位值不能為null,且最多隻能存在25個欄位。
- 返回值為Row 使用Row來實現返回3個欄位的UDTF示例如下。
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataTypes; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class ParseUdtf extends TableFunction<Row> { public void eval(String str) { String[] split = str.split(","); String first = split[0]; long second = Long.parseLong(split[1]); int third = Integer.parseInt(split[2]); Row row = new Row(3); row.setField(0, first); row.setField(1, second); row.setField(2, third); collect(row); } @Override // 如果返回值是Row,則必須過載實現getResultType方法,顯式地宣告返回的欄位型別。 public DataType getResultType(Object[] arguments, Class[] argTypes) { return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT); } }
-
說明Row的欄位值可以是null,但如果需要使用Row,必須過載實現
getResultType
方法。
SQL語法
UDTF支援cross join和left join,在使用UDTF時需要新增lateral
和table
關鍵字。以ParseUdtf
為例,需要先註冊一個Function名字。
CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
- cross join
左表的每一行資料都會關聯上UDTF產出的每一行資料,如果UDTF不產出任何資料,則這1行不會輸出。
select S.id, S.content, T.a, T.b, T.c from input_stream as S, lateral table(parseUdtf(content)) as T(a, b, c);
- left join
左表的每一行資料都會關聯上UDTF產出的每一行資料,如果UDTF不產出任何資料,則這1行的UDTF的欄位會用null值填充。
說明left join UDTF語句後面必須新增
on true
引數。select S.id, S.content, T.a, T.b, T.c from input_stream as S left join lateral table(parseUdtf(content)) as T(a, b, c) on true;
註冊使用
- 登入實時計算控制檯。
- 在頂部選單中,單擊開發。
- 在左側的導航欄中,單擊資源引用。
- 在資源引用頁籤的右上角,單擊新建資源。
- 在上傳資源頁面,輸入資源配置資訊。
引數名稱 說明 上傳方式 實時計算控制檯上僅支援本地上傳。 說明本地上傳JAR包的大小上限為300 MB。如果JAR包大小超過300 MB,請在叢集繫結的OSS控制檯上,或通過OpenAPI的方式上傳JAR包。 資源選擇 單擊選擇資源,選擇需要引用的資源。 資源名稱 輸入資源名稱。 資源備註 輸入資源備註資訊。 資源型別 選擇引用資源型別,JAR、DICTIONARY或PYTHON。 - 在資源引用頁籤中,將滑鼠懸停在對應作業的右側的更多上。
- 在下拉列表中,選擇引用。
- 在作業的編輯視窗的頂部,輸入自定義函式宣告,示例如下。
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
上線和啟動
自定義聚合函式(UDTF)的上線和啟動步驟,請參見上線和啟動。
UDTF示例
-- UDTF str.split("\\|"); create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf'; create table sls_stream( a INT, b BIGINT, c VARCHAR ) with ( type='sls', endPoint='yourEndpoint', accessKeyId='yourAccessKeyId', accessKeySecret='yourAccessSecret', startTime = '2017-07-04 00:00:00', project='yourProjectName', logStore='yourLogStoreName', consumerGroup='consumerGroupTest2' ); -- 將c欄位傳入splitUdtf,切分後得到多行1列的表T(s)。s表示欄位名字。 create view v1 as select a,b,c,s from sls_stream, lateral table(splitUdtf(c)) as T(s); create table rds_output( id INT, len BIGINT, content VARCHAR ) with ( type='rds', url='yourDatabaseURL', tableName='yourDatabaseTableName', userName='yourDatabaseUserName', password='yourDatabasePassword' ); insert into rds_output select a,b,s from v1;