1. 程式人生 > 實用技巧 >flink1.11.1,全新的table api

flink1.11.1,全新的table api

最新在自學flink,直接上的最新版,學到了table api,發現flink1.11/.1版本和flink1.10.1版本有很大差別。因為是新版本目前網上資料也不多,我通過查閱官網和自己編碼執行,簡單寫了個demo分享和講解一下。

  • 新api提供的TableEnvironment介面,直接提供了接受原始資料的方法

flink1.10.1的TableEnvironment沒有僅有fromTableSource,和from兩個方法返回Table

fromTableSource(TableSource<?> source)
Creates a table from a table source.

from(String path)
Reads a registered table and returns the resulting Table.

針對流資料或者批資料必須使用TableEnvironment介面下的BatchTableEnvironment介面和StreamTableEnvironment介面提供的方法,針對流以及批資料來源呼叫不同方法接受資料

flink1.11.1中TableEnvironment提供了fromValues方法以及其過載方法,用於接受原生資料,並且fromTableSource已經是過時方法。

fromValues(AbstractDataType<?> rowType, Expression... values)
Creates a Table from given collection of objects with a given row type.

  • 新表示式

fromValues很多過載方法必須接受DataTypes.ROW抽象資料模型,對此flink1.11.1提供了新的表示式api方便開發人員進行編寫程式碼

    /**
     * Creates a row of expressions.
     */
    public static ApiExpression row(Object head, Object... tail) {
        return apiCallAtLeastOneArgument(BuiltInFunctionDefinitions.ROW, head, tail);
    }

以下是一個簡單demo

 1         String words = "hello flink hello blink hello muscle hello power";
 2         List<ApiExpression> wordList = Arrays.stream(words.split("\\W+"))
 3                 .map(word -> row(word, 1))
 4                 .collect(Collectors.toList());
 5         //註冊成表,指定欄位
 6         Table table = tblEnv.fromValues(
 7                 DataTypes.ROW(
 8                         DataTypes.FIELD("word", DataTypes.STRING().notNull()),
 9                         DataTypes.FIELD("frequency", DataTypes.INT().notNull())
10                 ), wordList);
11         table.printSchema();
12         tblEnv.createTemporaryView("word_count", table);
13 
14         //執行查詢
15         Table table2 = tblEnv.sqlQuery("select word, sum(frequency) from word_count group by word");
16         table2.execute().print();