1. 程式人生 > >Flink SQL Table 我們一起去看2018中超聯賽-Flink牛刀小試

Flink SQL Table 我們一起去看2018中超聯賽-Flink牛刀小試

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

寫在前面的話

Flink是一個新型的流式處理引擎,作者自身只是對Spark底層較為熟悉,有興趣可以查閱我的Spark core ,Spark String 以及 Spark SQL 原始碼解讀系列。在這裡我們只是品味一下號稱第四代大資料處理引擎的Flink,作者也並沒有深入到Flink底層原始碼級別。請見諒如果您已經是FLink大牛了!看一下2018中超聯賽積分榜:

1 SQL Table(牛刀小試)

  • Table API 是以 表 為中心的宣告式DSL,其中表可能會動態變化(在表達流資料時)。Table API遵循(擴充套件的)關係模型:表有二維資料結構(schema)(類似於關係資料庫中的表),同時API提供可比較的操作,例如select、project、join、group-by、aggregate等。Table API程式宣告式地定義了 什麼邏輯操作應該執行 而不是準確地確定 這些操作程式碼的看上去如何 。 儘管Table API可以通過多種型別的使用者自定義函式(UDF)進行擴充套件,其仍不如 核心API 更具表達能力,但是使用起來卻更加簡潔(程式碼量更少)。除此之外,Table API程式在執行之前會經過內建優化器進行優化。

  • 你可以在表與 DataStream/DataSet 之間無縫切換,以允許程式將 Table API 與 DataStream 以及 DataSet 混合使用。

  • Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 類似,但是是以SQL查詢表示式的形式表現程式。SQL抽象與Table API互動密切,同時SQL查詢可以直接在Table API定義的表上執行。

  • Apache Flink對SQL的支援可以追溯到一年前釋出的0.9.0-milestone1版本。此版本通過引入Table API來提供類似於SQL查詢的功能,此功能可以操作分散式的資料集,並且可以自由地和Flink其他API進行組合。Tables在釋出之初就支援靜態的以及流式資料(也就是提供了DataSet和DataStream相關APIs)。我們可以將DataSet或DataStream轉成Table;同時也可以將Table轉換成DataSet或DataStream。

  • The highest level abstraction offered by Flink is SQL. This abstraction is similar to the Table API both in semantics and expressiveness, but represents programs as SQL query expressions. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API.

  • 使用者可以通過 TableEnvironment 類中的 sqlQuery() 方法執行SQL查詢,查詢結果會以 Table 形式返回。使用者可將 Table 用於後續的 SQL 及 Table 查詢,或將其轉換成 DataSet 或 DataStream,亦可將它寫入到某個 TableSink 中。無論是通過 SQL 還是 Table API 提交的查詢都可以進行無縫銜接,系統內部會對它們進行整體優化,並最終轉換成一個 Flink 程式執行。

  • 為了在 SQL 查詢中使用某個 Table,使用者必須先在 TableEnvironment 中對其進行註冊。Table 的註冊來源可以是某個 TableSource,某個現有的 Table,或某個DataStream 或 DataSet。此外,使用者還可以通過在 TableEnvironment 中註冊外部 Catalog 的方式來指定資料來源位置。

  • 為方便使用,在執行 Table.toString() 方法時,系統會自動以一個唯一名稱在當前 TableEnvironment 中註冊該 Table 並返回該唯一名稱。因此,在以下示例中,Table 物件都可以直接以內聯(字串拼接)方式出現在 SQL 語句中。

  • 注意: 現階段Flink對於SQL的支援還並不完善。如果在查詢中使用了系統尚不支援的功能,會引發 TableException 。以下章節將對批環境和流環境下 SQL 功能支援情況做出相應說明。

2 上程式碼分析(球隊粒度進行進球聚合排序)

  • 1 進行pojo物件的資料封裝。
  • 2 BatchTableEnvironment tableEnv環境生成: BatchTableEnvironment.getTableEnvironment(env);
  • 3 Table表生成:Table topScore = tableEnv.fromDataSet(topInput)
  • 4 Table表註冊:tableEnv.registerTable(“topScore”,topScore);
  • 5 Table表查詢:tableEnv.sqlQuery
  • 6 Table錶轉換回DataSet: tableEnv.toDataSet

2.1 詳情請參考程式碼

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
            
            DataSet<String> input = env.readTextFile("C:\\CoreForBigData\\FLINK\\TopCore.csv");
            input.print();
            DataSet<TopScorers> topInput = input.map(new MapFunction<String, TopScorers>() {
                @Override
                public TopScorers map(String s) throws Exception {
                    String[] splits = s.split("\t");
                    return new TopScorers(Integer.valueOf(splits[0]),splits[1],splits[2],Integer.valueOf(splits[3]),Integer.valueOf(splits[4]),Integer.valueOf(splits[5]),Integer.valueOf(splits[6]),Integer.valueOf(splits[7]),Integer.valueOf(splits[8]),Integer.valueOf(splits[9]),Integer.valueOf(splits[10]));
                }
            });

            //將DataSet轉換為Table
            Table topScore = tableEnv.fromDataSet(topInput);
            //將topScore註冊為一個表
            tableEnv.registerTable("topScore",topScore);
    
            Table tapiResult = tableEnv.scan("topScore").select("club");
            tapiResult.printSchema();
    
            Table groupedByCountry = tableEnv.sqlQuery("select club, sum(jinqiu) as sum_score from topScore group by club order by sum_score desc");
            
            //轉換回dataset
            DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class);
    
            //將dataset map成tuple輸出
            result.map(new MapFunction<Result, Tuple2<String,Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Result result) throws Exception {
                    String country = result.club;
                    int sum_total_score = result.sum_score;
                    return Tuple2.of(country,sum_total_score);
                }
            }).print();
    
        }
    
        /**
         * 源資料的對映類
         */
        public static class TopScorers {
            /**
             * 排名,球員,球隊,出場,進球,射正,任意球,犯規,黃牌,紅牌
             */
            public Integer rank;
            public String player;
            public String club;
            public Integer chuchang;
            public Integer jinqiu;
            public Integer zhugong;
            public Integer shezheng;
            public Integer renyiqiu;
            public Integer fangui;
            public Integer huangpai;
            public Integer hongpai;
    
            public TopScorers() {
                super();
            }
    
            public TopScorers(Integer rank, String player, String club, Integer chuchang, Integer jinqiu, Integer zhugong, Integer shezheng, Integer renyiqiu, Integer fangui, Integer huangpai, Integer hongpai) {
                this.rank = rank;
                this.player = player;
                this.club = club;
                this.chuchang = chuchang;
                this.jinqiu = jinqiu;
                this.zhugong = zhugong;
                this.shezheng = shezheng;
                this.renyiqiu = renyiqiu;
                this.fangui = fangui;
                this.huangpai = huangpai;
                this.hongpai = hongpai;
            }
        }
    
        /**
         * 統計結果對應的類
         */
        public static class Result {
            public String club;
            public Integer sum_score;
    
            public Result() {}
        }
    }

2.2 結果展示(2018恆大隊很厲害,進球55個)

3 理論昇華一下

3.1 Create a TableEnvironment

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);

3.2 DSL風格用法

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query

3.3 Register a DataStream or DataSet as Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");

3.4 Convert a DataStream or DataSet into a Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

4 收工

通過2018中超聯賽,我們管中窺豹,學會了Flink SQL Table 的核心思想,當然本文並不完善,希望本文能夠給大家帶來一些收穫。辛苦成文,彼此珍惜,謝謝!

版權宣告:本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。版權宣告:禁止轉載,歡迎學習。QQ郵箱地址:[email protected],如有任何問題,可隨時聯絡。

秦凱新 於深圳 201811262252