1. 程式人生 > 其它 >Flink基礎(67):FLINK SQL(44) 自定義函式(三)自定義聚合函式(UDAF)

Flink基礎(67):FLINK SQL(44) 自定義函式(三)自定義聚合函式(UDAF)

本文為您介紹如何為實時計算Flink版自定義聚合函式(UDAF)搭建開發環境、編寫業務程式碼及上線。

注意阿里雲實時計算Flink版共享模式暫不支援自定義函式,僅獨享模式支援自定義函式。

定義

自定義聚合函式(UDAF)可以將多條記錄聚合成1條記錄。

UDAF抽象類內部方法

說明雖然UDAF可以使用Java或Scala實現,但是建議您使用Java,因為Scala的資料型別有時會造成不必要的效能損失。 AggregateFunction的核心介面方法,如下所示:
  • createAccumulator和getValue方法
/*
* @param <T> UDAF的輸出結果的型別。
* @param <ACC> UDAF的accumulator的型別。accumulator是UDAF計算中用來存放計算中間結果的資料型別。您可以需要根據需要自行設計每個UDAF的accumulator。
*/ public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction { /* * 初始化AggregateFunction的accumulator。 * 系統在進行第一個aggregate計算之前,呼叫一次此方法。 */ public ACC createAccumulator(); /* * 系統在每次aggregate計算完成後,呼叫此方法。 */ public T getValue(ACC accumulator); }
  • 說明
    • createAccumulator和getValue可以定義在AggregateFunction抽象類內。
    • UDAF必須包含1個accumulate方法。
  • accumulate方法
    public void accumulate(ACC accumulator, ...[使用者指定的輸入引數]...);
    說明
    • 您需要實現一個accumulate方法,來描述如何計算輸入的資料,並更新資料到accumulator中。
    • accumulate方法的第一個引數必須是使用AggregateFunction的ACC型別的accumulator。在系統執行過程中,runtime程式碼會把accumulator的歷史狀態和您指定的上游資料(支援任意數量,任意型別的資料)作為引數,一起傳遞給accumulate方法。
  • retract和merge方法

    createAccumulator、getValue和accumulate 3個方法一起使用,可以設計出一個最基本的UDAF。但是實時計算Flink版一些特殊的場景需要您提供retract和merge兩個方法才能完成。

    通常,計算都是對無限流的一個提前的觀測值(early firing)。既然有early firing,就會有對發出的結果的修改,這個操作叫作撤回(retract)。SQL翻譯優化器會幫助您自動判斷哪些情況下會產生撤回的資料,哪些操作需要處理帶有撤回標記的資料。但是您需要實現一個retract方法來處理撤回的資料。
    public void retract(ACC accumulator, ...[您指定的輸入引數]...);
    說明
    • retract方法是accumulate方法的逆操作。例如,實現Count功能的UDAF,在使用accumulate方法時,每來一條資料要加1;在使用retract方法時,就要減1。
    • 類似於accumulate方法,retract方法的第1個引數必須使用AggregateFunction的ACC型別的accumulator。在系統執行過程中,runtime程式碼會把accumulator的歷史狀態,和您指定的上游資料(任意數量,任意型別的資料)一起傳送給retract計算。
    在實時計算Flink版中一些場景需要使用merge方法,例如session window。由於實時計算Flink版具有out of order的特性,後輸入的資料有可能位於2個原本分開的session中間,這樣就把2個session合為1個session。此時,需要使用merge方法把多個accumulator合為1個accumulator。
    public void merge(ACC accumulator, Iterable<ACC> its);
    說明
    • merge方法的第1個引數,必須是使用AggregateFunction的ACC型別的accumulator,而且第1個accumulator是merge方法完成之後,狀態所存放的地方。
    • merge方法的第2個引數是1個ACC型別的accumulator遍歷迭代器,裡面有可能存在1個或多個accumulator。

搭建開發環境

搭建開發環境請參見環境搭建

編寫業務邏輯程式碼

以Java為例,舉例程式碼如下。
import org.apache.flink.table.functions.AggregateFunction;

public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
    //定義存放count UDAF狀態的accumulator的資料的結構。
    public static class CountAccum {
        public long total;
    }

    //初始化count UDAF的accumulator。
    public CountAccum createAccumulator() {
        CountAccum acc = new CountAccum();
        acc.total = 0;
        return acc;
    }

    //getValue提供瞭如何通過存放狀態的accumulator計算count UDAF的結果的方法。
    public Long getValue(CountAccum accumulator) {
        return accumulator.total;
    }

    //accumulate提供瞭如何根據輸入的資料更新count UDAF存放狀態的accumulator。
    public void accumulate(CountAccum accumulator, Object iValue) {
        accumulator.total++;
    }

    public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
         for (CountAccum other : its) {
            accumulator.total += other.total;
         }
    }
}
說明AggregateFunction的子類支援open和close方法作為可選方法,請參見自定義標量函式(UDF)自定義表值函式(UDTF)的寫法。

註冊使用

  1. 登入實時計算控制檯
  2. 在頂部選單中,單擊開發。
  3. 在左側的導航欄中,單擊資源引用。
  4. 資源引用頁籤的左上角,單擊新建資源。
  5. 上傳資源頁面,輸入資源配置資訊。
    引數名稱說明
    上傳方式 實時計算控制檯上僅支援本地上傳。 說明本地上傳JAR包的大小上限為300 MB。如果JAR包大小超過300 MB,請在叢集繫結的OSS控制檯上,或通過OpenAPI的方式上傳JAR包。
    資源選擇 單擊選擇資源,選擇需要引用的資源。
    資源名稱 輸入資源名稱。
    資源備註 輸入資源備註資訊。
    資源型別 選擇引用資源型別,JAR、DICTIONARY或PYTHON。
  6. 資源列表中,將滑鼠懸停在目標資源右側的更多上。
  7. 在下拉列表中,單擊引用。
  8. 在作業編輯視窗,輸入自定義函式宣告,示例如下。
    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

上線和啟動

自定義聚合函式(UDAF)的上線和啟動步驟,請參見上線啟動

示例

-- UDAF計算count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';

create table sls_stream(
   a int,
   b bigint,
   c varchar
) with (
   type='sls',
   endPoint='yourEndpoint',
   accessKeyId='yourAccessId',
   accessKeySecret='yourAccessSecret',
   startTime='2017-07-04 00:00:00',
   project='<yourPorjectName>',
   logStore='stream-test2',
   consumerGroup='consumerGroupTest3'
);

create table rds_output(
   len1 bigint,
   len2 bigint
) with (
   type='rds',
   url='yourDatabaseURL',
   tableName='<yourDatabaseTableName>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);

insert into rds_output
select
   count(a),
   countUdaf(a)
from sls_stream;