Flink基礎(67):FLINK SQL(44) 自定義函式(三)自定義聚合函式(UDAF)
阿新 • • 發佈:2021-08-07
本文為您介紹如何為實時計算Flink版自定義聚合函式(UDAF)搭建開發環境、編寫業務程式碼及上線。
注意阿里雲實時計算Flink版共享模式暫不支援自定義函式,僅獨享模式支援自定義函式。定義
自定義聚合函式(UDAF)可以將多條記錄聚合成1條記錄。
UDAF抽象類內部方法
說明雖然UDAF可以使用Java或Scala實現,但是建議您使用Java,因為Scala的資料型別有時會造成不必要的效能損失。 AggregateFunction的核心介面方法,如下所示:- createAccumulator和getValue方法
/* * @param <T> UDAF的輸出結果的型別。 * @param <ACC> UDAF的accumulator的型別。accumulator是UDAF計算中用來存放計算中間結果的資料型別。您可以需要根據需要自行設計每個UDAF的accumulator。*/ public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction { /* * 初始化AggregateFunction的accumulator。 * 系統在進行第一個aggregate計算之前,呼叫一次此方法。 */ public ACC createAccumulator(); /* * 系統在每次aggregate計算完成後,呼叫此方法。 */ public T getValue(ACC accumulator); }
-
說明
- createAccumulator和getValue可以定義在AggregateFunction抽象類內。
- UDAF必須包含1個accumulate方法。
- createAccumulator和getValue可以定義在AggregateFunction抽象類內。
- accumulate方法
說明public void accumulate(ACC accumulator, ...[使用者指定的輸入引數]...);
- 您需要實現一個accumulate方法,來描述如何計算輸入的資料,並更新資料到accumulator中。
- accumulate方法的第一個引數必須是使用AggregateFunction的ACC型別的accumulator。在系統執行過程中,runtime程式碼會把accumulator的歷史狀態和您指定的上游資料(支援任意數量,任意型別的資料)作為引數,一起傳遞給accumulate方法。
- retract和merge方法
createAccumulator、getValue和accumulate 3個方法一起使用,可以設計出一個最基本的UDAF。但是實時計算Flink版一些特殊的場景需要您提供retract和merge兩個方法才能完成。
通常,計算都是對無限流的一個提前的觀測值(early firing)。既然有early firing,就會有對發出的結果的修改,這個操作叫作撤回(retract)。SQL翻譯優化器會幫助您自動判斷哪些情況下會產生撤回的資料,哪些操作需要處理帶有撤回標記的資料。但是您需要實現一個retract方法來處理撤回的資料。
說明public void retract(ACC accumulator, ...[您指定的輸入引數]...);
- retract方法是accumulate方法的逆操作。例如,實現Count功能的UDAF,在使用accumulate方法時,每來一條資料要加1;在使用retract方法時,就要減1。
- 類似於accumulate方法,retract方法的第1個引數必須使用AggregateFunction的ACC型別的accumulator。在系統執行過程中,runtime程式碼會把accumulator的歷史狀態,和您指定的上游資料(任意數量,任意型別的資料)一起傳送給retract計算。
說明public void merge(ACC accumulator, Iterable<ACC> its);
- merge方法的第1個引數,必須是使用AggregateFunction的ACC型別的accumulator,而且第1個accumulator是merge方法完成之後,狀態所存放的地方。
- merge方法的第2個引數是1個ACC型別的accumulator遍歷迭代器,裡面有可能存在1個或多個accumulator。
搭建開發環境
搭建開發環境請參見環境搭建。
編寫業務邏輯程式碼
以Java為例,舉例程式碼如下。import org.apache.flink.table.functions.AggregateFunction; public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> { //定義存放count UDAF狀態的accumulator的資料的結構。 public static class CountAccum { public long total; } //初始化count UDAF的accumulator。 public CountAccum createAccumulator() { CountAccum acc = new CountAccum(); acc.total = 0; return acc; } //getValue提供瞭如何通過存放狀態的accumulator計算count UDAF的結果的方法。 public Long getValue(CountAccum accumulator) { return accumulator.total; } //accumulate提供瞭如何根據輸入的資料更新count UDAF存放狀態的accumulator。 public void accumulate(CountAccum accumulator, Object iValue) { accumulator.total++; } public void merge(CountAccum accumulator, Iterable<CountAccum> its) { for (CountAccum other : its) { accumulator.total += other.total; } } }說明AggregateFunction的子類支援open和close方法作為可選方法,請參見自定義標量函式(UDF)或自定義表值函式(UDTF)的寫法。
註冊使用
- 登入實時計算控制檯。
- 在頂部選單中,單擊開發。
- 在左側的導航欄中,單擊資源引用。
- 在資源引用頁籤的左上角,單擊新建資源。
- 在上傳資源頁面,輸入資源配置資訊。
引數名稱 說明 上傳方式 實時計算控制檯上僅支援本地上傳。 說明本地上傳JAR包的大小上限為300 MB。如果JAR包大小超過300 MB,請在叢集繫結的OSS控制檯上,或通過OpenAPI的方式上傳JAR包。 資源選擇 單擊選擇資源,選擇需要引用的資源。 資源名稱 輸入資源名稱。 資源備註 輸入資源備註資訊。 資源型別 選擇引用資源型別,JAR、DICTIONARY或PYTHON。 - 在資源列表中,將滑鼠懸停在目標資源右側的更多上。
- 在下拉列表中,單擊引用。
- 在作業編輯視窗,輸入自定義函式宣告,示例如下。
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
上線和啟動
自定義聚合函式(UDAF)的上線和啟動步驟,請參見上線和啟動。
示例
-- UDAF計算count CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf'; create table sls_stream( a int, b bigint, c varchar ) with ( type='sls', endPoint='yourEndpoint', accessKeyId='yourAccessId', accessKeySecret='yourAccessSecret', startTime='2017-07-04 00:00:00', project='<yourPorjectName>', logStore='stream-test2', consumerGroup='consumerGroupTest3' ); create table rds_output( len1 bigint, len2 bigint ) with ( type='rds', url='yourDatabaseURL', tableName='<yourDatabaseTableName>', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); insert into rds_output select count(a), countUdaf(a) from sls_stream;