1. 程式人生 > 實用技巧 >Flink RetractStream示例及UDF函式實現

Flink RetractStream示例及UDF函式實現

介紹

今天在Flink 1.7.2版本上跑一個Flink SQL 示例 RetractPvUvSQL,報

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 19 to line 1, column 51: Cannot apply 'DATE_FORMAT' to arguments of type 'DATE_FORMAT(<VARCHAR(65536)>, <CHAR(2)>)'. Supported form(s): '(TIMESTAMP, FORMAT)'

從提示看應該是不支援引數為字串,接下來我們自定義一個UDF函式來支援這種場景。

官網不建議使用DATE_FORMAT(timestamp, string) 這種方式

RetractPvUvSQL 程式碼

public class RetractPvUvSQL {

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        DataStreamSource<PageVisit> input = env.fromElements(
                new PageVisit("2017-09-16 09:00:00", 1001, "/page1"),
                new PageVisit("2017-09-16 09:00:00", 1001, "/page2"),

                new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
                new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
                new PageVisit("2017-09-16 10:30:00", 1005, "/page2"));

        // register the DataStream as table "visit_table"
        tEnv.registerDataStream("visit_table", input, "visitTime, userId, visitPage");
        
        Table table = tEnv.sqlQuery(
                "SELECT " +
                        "visitTime, " +
                        "DATE_FORMAT(max(visitTime), 'HH') as ts, " +
                        "count(userId) as pv, " +
                        "count(distinct userId) as uv " +
                        "FROM visit_table " +
                        "GROUP BY visitTime");
        DataStream<Tuple2<Boolean, Row>> dataStream = tEnv.toRetractStream(table, Row.class);

        if (params.has("output")) {
            String outPath = params.get("output");
            System.out.println("Output path: " + outPath);
            dataStream.writeAsCsv(outPath);
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            dataStream.print();
        }
        env.execute();
    }

    /**
     * Simple POJO containing a website page visitor.
     */
    public static class PageVisit {
        public String visitTime;
        public long userId;
        public String visitPage;

        // public constructor to make it a Flink POJO
        public PageVisit() {
        }

        public PageVisit(String visitTime, long userId, String visitPage) {
            this.visitTime = visitTime;
            this.userId = userId;
            this.visitPage = visitPage;
        }

        @Override
        public String toString() {
            return "PageVisit " + visitTime + " " + userId + " " + visitPage;
        }
    }
}

UDF實現

實現引數為字串的日期解析

public class DateFormat extends ScalarFunction {

    public String eval(Timestamp t, String format) {
        return new SimpleDateFormat(format).format(t);
    }

    /**
     * 預設日期格式:yyyy-MM-dd HH:mm:ss
     *
     * @param t
     * @param format
     * @return
     */
    public static String eval(String t, String format) {
        try {
            Date originDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(t);
            return new SimpleDateFormat(format).format(originDate);
        } catch (ParseException e) {
            throw new RuntimeException("日期:" + t + "解析為格式" + format + "出錯");
        }
    }
}

因為flink 已經內建DATE_FORMAT函式,這裡我們改個名字:DATEFORMAT

  //register the function
  tEnv.registerFunction("DATEFORMAT", new DateFormat());
    
    
   Table table = tEnv.sqlQuery(
        "SELECT " +
                "visitTime, " +
                "DATEFORMAT(max(visitTime), 'HH') as ts, " +
                "count(userId) as pv, " +
                "count(distinct userId) as uv " +
                "FROM visit_table " +
                "GROUP BY visitTime");

從UDF函式註冊原始碼看,自定義函式在Table API或SQL API 都可以使用

  /**
    * Registers a [[ScalarFunction]] under a unique name. Replaces already existing
    * user-defined functions under this name.
    */
  def registerFunction(name: String, function: ScalarFunction): Unit = {
    // check if class could be instantiated
    checkForInstantiation(function.getClass)

    // register in Table API

    functionCatalog.registerFunction(name, function.getClass)

    // register in SQL API
    functionCatalog.registerSqlFunction(
      createScalarSqlFunction(name, name, function, typeFactory)
    )
  }

執行的結果:

printing result to stdout. Use --output to specify output path.
6> (true,2017-09-16 10:30:00,10,1,1)
4> (true,2017-09-16 09:00:00,09,1,1)
4> (false,2017-09-16 09:00:00,09,1,1)
6> (false,2017-09-16 10:30:00,10,1,1)
4> (true,2017-09-16 09:00:00,09,2,1)
6> (true,2017-09-16 10:30:00,10,2,1)
6> (false,2017-09-16 10:30:00,10,2,1)
6> (true,2017-09-16 10:30:00,10,3,1)

Process finished with exit code 0

我們看下這個結果是什麼意思:

Flink RetractStream 用true或false來標記資料的插入和撤回,返回true代表資料插入,false代表資料的撤回,在網上看到一個圖很直觀地說明RetractStream 為什麼存在?

看我們的source資料,9點與10點半的資料剛開始pv,uv都為新增,對應的第二條資料來的時候,pv發生變化, 此時要撤掉第一次的結果,更新為新的結果資料 ,就好比我們有時候更新資料的一種辦法先刪除再插入,後面到來的資料以此類推。

總結

1.Flink處理資料把錶轉換為流的時候,可以使用toAppendStream與toRetractStream,前者適用於資料追加的場景, 後者適用於更新,刪除場景

2.FlinkSQL中可以使用我們自定義的函式.Flink UDF自定義函式實現:evaluation方法必須定義為public,命名為eval。evaluation方法的輸入引數型別和返回值型別決定著函式的輸入引數型別和返回值型別。evaluation方法也可以被過載實現多個eval。同時evaluation方法支援變引數,例如:eval(String... strs)。