1. 程式人生 > 其它 >Flink基礎(66):FLINK SQL(43) 自定義函式(二)自定義標量函式(UDF)

Flink基礎(66):FLINK SQL(43) 自定義函式(二)自定義標量函式(UDF)

本文為您介紹如何為實時計算Flink版自定義標量函式(UDF)搭建開發環境、編寫業務程式碼及上線。

注意阿里雲實時計算Flink版共享模式暫不支援自定義函式,僅獨享模式支援自定義函式。

定義

自定義標量函式(UDF)將0個、1個或多個標量值對映到一個新的標量值。

搭建環境

搭建開發環境,請參見環境搭建

業務程式碼

UDF需要在ScalarFunction類中實現eval方法。open方法和close方法可選。 注意UDF預設對於相同的輸入會有相同的輸出。如果UDF不能保證相同的輸出,例如,在UDF中呼叫外部服務,相同的輸入值可能返回不同的結果,建議您使用override isDeterministic()
方法,返回False。否則在某些條件下,輸出結果不符合預期。例如,UDF運算元前移。 以Java為例,示例程式碼如下。
package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class StringLengthUdf extends ScalarFunction {
    // 可選,open方法可以不寫。
    // 如果編寫open方法需要宣告'import org.apache.flink.table.functions.FunctionContext;'。
@Override public void open(FunctionContext context) { } public long eval(String a) { return a == null ? 0 : a.length(); } public long eval(String b, String c) { return eval(b) + eval(c); } //可選,close方法可以不寫。 @Override public void close() { } }

編寫SQL語句

在指定的Class中編寫SQL語句,自定義函式中SQL語句示例如下。
-- udf str.length()
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

create table sls_stream(
    a int,
    b int,
    c varchar
) with (
    type='sls',
    endPoint='<yourEndpoint>',
    accessKeyId='<yourAccessId>',
    accessKeySecret='<yourAccessSecret>',
    startTime = '2017-07-04 00:00:00',
    project='<yourProjectName>',
    logStore='<yourLogStoreName>',
    consumerGroup='consumerGroupTest1'
);

create table rds_output(
    id int,
    len bigint,
    content VARCHAR
) with (
    type='rds',
    url='yourDatabaseURL',
    tableName='<yourDatabaseTableName>',
    userName='<yourDatabaseUserName>',
    password='<yourDatabasePassword>'
);

insert into rds_output
select
    a,
    stringLengthUdf(c),
    c as content
from sls_stream;

註冊使用

  1. 登入實時計算控制檯
  2. 在頂部選單中,單擊開發。
  3. 在左側的導航欄中,單擊資源引用。
  4. 資源引用頁籤的右上角,單擊新建資源。
  5. 上傳資源頁面,輸入資源配置資訊。
    引數名稱說明
    上傳方式 實時計算控制檯上僅支援本地上傳。 說明本地上傳JAR包的大小上限為300 MB。如果JAR包大小超過300 MB,請在叢集繫結的OSS控制檯上,或通過OpenAPI的方式上傳JAR包。
    資源選擇 單擊選擇資源,選擇需要引用的資源。
    資源名稱 輸入資源名稱。
    資源備註 輸入資源備註資訊。
    資源型別 選擇引用資源型別,JAR、DICTIONARY或PYTHON。
  6. 資源引用頁籤中,將滑鼠懸停在對應作業的右側的更多上。
  7. 在下拉列表中,選擇引用。
  8. 在作業的編輯視窗的頂部,輸入自定義函式宣告,示例如下。
    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

上線和啟動

在作業開發頁面單擊上線後,在運維頁面單擊啟動,即可完成自定義函式的上線。

常見問題

Q:為什麼實現了一個隨機數生成函式,在執行時產生的值卻一樣?

A:如果自定義函式是無參的,並且沒有宣告是非確定性函式,編譯期間可能會被優化成一個常量值。如果需要讓函式變成非確定性函式,不被優化為常量,建議您使用override isDeterministic()方法,返回False