Flink基礎(66):FLINK SQL(43) 自定義函式(二)自定義標量函式(UDF)
阿新 • • 發佈:2021-08-07
本文為您介紹如何為實時計算Flink版自定義標量函式(UDF)搭建開發環境、編寫業務程式碼及上線。
注意阿里雲實時計算Flink版共享模式暫不支援自定義函式,僅獨享模式支援自定義函式。定義
自定義標量函式(UDF)將0個、1個或多個標量值對映到一個新的標量值。
搭建環境
搭建開發環境,請參見環境搭建。
業務程式碼
UDF需要在ScalarFunction類中實現eval
方法。open
方法和close
方法可選。
注意UDF預設對於相同的輸入會有相同的輸出。如果UDF不能保證相同的輸出,例如,在UDF中呼叫外部服務,相同的輸入值可能返回不同的結果,建議您使用override isDeterministic()
False
。否則在某些條件下,輸出結果不符合預期。例如,UDF運算元前移。
以Java為例,示例程式碼如下。
package com.hjc.test.blink.sql.udx; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; public class StringLengthUdf extends ScalarFunction { // 可選,open方法可以不寫。 // 如果編寫open方法需要宣告'import org.apache.flink.table.functions.FunctionContext;'。@Override public void open(FunctionContext context) { } public long eval(String a) { return a == null ? 0 : a.length(); } public long eval(String b, String c) { return eval(b) + eval(c); } //可選,close方法可以不寫。 @Override public void close() { } }
編寫SQL語句
在指定的Class中編寫SQL語句,自定義函式中SQL語句示例如下。-- udf str.length() CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf'; create table sls_stream( a int, b int, c varchar ) with ( type='sls', endPoint='<yourEndpoint>', accessKeyId='<yourAccessId>', accessKeySecret='<yourAccessSecret>', startTime = '2017-07-04 00:00:00', project='<yourProjectName>', logStore='<yourLogStoreName>', consumerGroup='consumerGroupTest1' ); create table rds_output( id int, len bigint, content VARCHAR ) with ( type='rds', url='yourDatabaseURL', tableName='<yourDatabaseTableName>', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); insert into rds_output select a, stringLengthUdf(c), c as content from sls_stream;
註冊使用
- 登入實時計算控制檯。
- 在頂部選單中,單擊開發。
- 在左側的導航欄中,單擊資源引用。
- 在資源引用頁籤的右上角,單擊新建資源。
- 在上傳資源頁面,輸入資源配置資訊。
引數名稱 說明 上傳方式 實時計算控制檯上僅支援本地上傳。 說明本地上傳JAR包的大小上限為300 MB。如果JAR包大小超過300 MB,請在叢集繫結的OSS控制檯上,或通過OpenAPI的方式上傳JAR包。 資源選擇 單擊選擇資源,選擇需要引用的資源。 資源名稱 輸入資源名稱。 資源備註 輸入資源備註資訊。 資源型別 選擇引用資源型別,JAR、DICTIONARY或PYTHON。 - 在資源引用頁籤中,將滑鼠懸停在對應作業的右側的更多上。
- 在下拉列表中,選擇引用。
- 在作業的編輯視窗的頂部,輸入自定義函式宣告,示例如下。
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
上線和啟動
在作業開發頁面單擊上線後,在運維頁面單擊啟動,即可完成自定義函式的上線。
常見問題
Q:為什麼實現了一個隨機數生成函式,在執行時產生的值卻一樣?
A:如果自定義函式是無參的,並且沒有宣告是非確定性函式,編譯期間可能會被優化成一個常量值。如果需要讓函式變成非確定性函式,不被優化為常量,建議您使用override isDeterministic()
方法,返回False
。