1. 程式人生 > 其它 >Flink基礎(68):FLINK SQL(45) 自定義函式(四)自定義表值函式(UDTF)

Flink基礎(68):FLINK SQL(45) 自定義函式(四)自定義表值函式(UDTF)

本文為您介紹如何為實時計算Flink版自定義表值函式(UDTF)搭建開發環境、編寫業務程式碼以及上線。

說明阿里雲實時計算Flink版共享模式暫不支援自定義函式,僅獨享模式支援自定義函式。

定義

與自定義的標量函式類似,自定義的表值函式(UDTF)將0個、1個或多個標量值作為輸入引數(可以是變長引數)。與標量函式不同,表值函式可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。

搭建開發環境

參見環境搭建

編寫業務邏輯程式碼

UDTF需要在TableFunction類中實現eval方法。open方法和close方法可選。以Java為例,示例程式碼如下
package
com.hjc.test.blink.sql.udx; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction; public class SplitUdtf extends TableFunction<String> { // 可選,open方法可不編寫。如果編寫,則需要新增宣告'import org.apache.flink.table.functions.FunctionContext;'。 @Override
public void open(FunctionContext context) { // ... ... } public void eval(String str) { String[] split = str.split("\\|"); for (String s : split) { collect(s); } } // 可選,close方法可不編寫。 @Override public void close() { // ... ...
} }

多行返回

UDTF可以通過多次呼叫collect()實現將1行的資料轉為多行返回。

多列返回

UDTF不僅可以進行1行轉多行,還可以1列轉多列。如果您需要UDTF返回多列,只需要將返回值宣告成Tuple或Row。Tuple或Row解釋如下:
  • 返回值為Tuple 實時計算Flink版支援使用Tuple1到Tuple25 ,定義1個欄位到25個欄位。用Tuple3來返回3個欄位的UDTF示例如下。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableFunction;

// 使用Tuple作為返回值,一定要顯式宣告Tuple的泛型型別, 例如,String、Long和Integer。
public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> {

public void eval(String str) {
String[] split = str.split(",");
// 以下程式碼僅作示例,實際業務需要新增更多的校驗邏輯。
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third);
collect(tuple3);
}
}
    • 說明使用Tuple時,欄位值不能為null,且最多隻能存在25個欄位。
    • 返回值為Row 使用Row來實現返回3個欄位的UDTF示例如下。
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

public class ParseUdtf extends TableFunction<Row> {

public void eval(String str) {
String[] split = str.split(",");
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Row row = new Row(3);
row.setField(0, first);
row.setField(1, second);
row.setField(2, third);
collect(row);
}

@Override
// 如果返回值是Row,則必須過載實現getResultType方法,顯式地宣告返回的欄位型別。
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT);
}
}
  • 說明Row的欄位值可以是null,但如果需要使用Row,必須過載實現getResultType方法。

SQL語法

UDTF支援cross join和left join,在使用UDTF時需要新增lateraltable關鍵字。以ParseUdtf為例,需要先註冊一個Function名字。
CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
  • cross join 左表的每一行資料都會關聯上UDTF產出的每一行資料,如果UDTF不產出任何資料,則這1行不會輸出。
    select S.id, S.content, T.a, T.b, T.c
    from input_stream as S,
    lateral table(parseUdtf(content)) as T(a, b, c);
  • left join 左表的每一行資料都會關聯上UDTF產出的每一行資料,如果UDTF不產出任何資料,則這1行的UDTF的欄位會用null值填充。 說明left join UDTF語句後面必須新增on true引數。
    select S.id, S.content, T.a, T.b, T.c
    from input_stream as S
    left join lateral table(parseUdtf(content)) as T(a, b, c) on true;

註冊使用

  1. 登入實時計算控制檯
  2. 在頂部選單中,單擊開發。
  3. 在左側的導航欄中,單擊資源引用。
  4. 資源引用頁籤的右上角,單擊新建資源。
  5. 上傳資源頁面,輸入資源配置資訊。
    引數名稱說明
    上傳方式 實時計算控制檯上僅支援本地上傳。 說明本地上傳JAR包的大小上限為300 MB。如果JAR包大小超過300 MB,請在叢集繫結的OSS控制檯上,或通過OpenAPI的方式上傳JAR包。
    資源選擇 單擊選擇資源,選擇需要引用的資源。
    資源名稱 輸入資源名稱。
    資源備註 輸入資源備註資訊。
    資源型別 選擇引用資源型別,JAR、DICTIONARY或PYTHON。
  6. 資源引用頁籤中,將滑鼠懸停在對應作業的右側的更多上。
  7. 在下拉列表中,選擇引用。
  8. 在作業的編輯視窗的頂部,輸入自定義函式宣告,示例如下。
    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

上線和啟動

自定義聚合函式(UDTF)的上線和啟動步驟,請參見上線啟動

UDTF示例

-- UDTF str.split("\\|");
create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf';

create table sls_stream(
a INT,
b BIGINT,
c VARCHAR
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKeyId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest2'
);

-- 將c欄位傳入splitUdtf,切分後得到多行1列的表T(s)。s表示欄位名字。
create view v1 as
select a,b,c,s
from sls_stream,
lateral table(splitUdtf(c)) as T(s);

create table rds_output(
id INT,
len BIGINT,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);

insert into rds_output
select
a,b,s
from v1;