1. 程式人生 > >Flink系統之Table API 和 SQL

Flink系統之Table API 和 SQL

  Flink提供了像表一樣處理的API和像執行SQL語句一樣把結果集進行執行。這樣很方便的讓大家進行資料處理了。比如執行一些查詢,在無界資料和批處理的任務上,然後將這些按一定的格式進行輸出,很方便的讓大家像執行SQL一樣簡單。

  今天主要寫的東西分為如下幾個方面,然後遵循著下邊幾個方面進行展開:

  1. Flink的不同API的層級梗概。

  2. FlinkSQL的程式設計的步驟。

  3. Flink程式設計的例子。

  

一、  Flink有著不同級別的API,不同級別的API方便不同使用者進行處理。普通使用者使用Datastream以及Dataset進行程式編寫,我們可以在其更高的基礎上使用Table API以及SQL,這也是Flink的強大之處,可以像使用處理表一樣處理資料。如果想研究的更高可以看更底層的東西。

SQL  High-level Language
Table API Declarative  DSL
Datastream / Dataset API Core API
Stateful Stream Processing

Low-level building block

(streams, state, [event] time)

二、 Flink的Table API 和 SQL程式設計步驟如下:

  1) 建立一個TableEnvironment表環境用於後續使用。TableEnvironment是 SQL 和 Table API的核心概念,它用於設定執行所需要的資料屬性,和ExecutionEnvironment類似,它主要負責:

    a) 登錄檔資料來源,從內部或者外部來源。

    b) 執行相應的SQL語句。

    c) 註冊自定義集數。

    d 將結果集進行掃描和寫入到目標資料來源。

    e) 相同的environment可以執行相應的join unin操作。

  2)接下來,咱們看一下如何註冊資料來源,注意不同的Flink版本有不同的實現,但是核心的內容是不變的:

    a) 可以直接從資料集裡進行註冊。比如 tableEnvironment.registerDataSet()。

    b) 在一個已經存在的Table中直接執行scan或者select,那麼會生成一個新的Table,也就是資料可以從已有的Table中再次獲取,Table t = tableEnv.scan("x").select("a, b,c")。

    c) 可以是TableSource, 也就是從不同的檔案、資料庫、訊息系統進行讀取。 比如csv檔案,TableSource csvSource = new CsvTableSource("path/to/file")。

  3)讀取完資料後進行處理,處理完之後要儲存起來,那麼需要Sink(儲存)到檔案或者資料庫、訊息系統等。

    a) 比如Sink到CSV檔案。 TableSink csvSink = new TableCSVSink("path/to/sink", ..)。

    b) Sink為指定欄位句和型別到CSV檔案中。

      指定表字段: String[] fieldNames = {"fild1", "filed2", "field3"}; 

      指定欄位型別: TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 

      指定表名和csv檔案:tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

三、接下來,看一下真實的例子。

    1)從給定的單詞和單詞的個數中統計一下,每個單詞出現的資料,使用SQL語句進行實現查詢統計。完整的樣例如下(注意,不同的FLink版本實現上有稍微的差異):

package myflink.sql;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;


public class WordCountSQL {

    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env);

        DataSet<WC> input = env.fromElements(
                WC.of("hello", 1),
                WC.of("hqs", 1),
                WC.of("world", 1),
                WC.of("hello", 1)
        );
        //註冊資料集
        tEnv.registerDataSet("WordCount", input, "word, frequency");

        //執行SQL,並結果集做為一個新表
        Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");

        DataSet<WC> result = tEnv.toDataSet(table, WC.class);

        result.print();

    }

    public static class WC {
        public String word; //hello
        public long frequency;

        //建立構造方法,讓flink進行例項化
        public WC() {}

        public static WC of(String word, long frequency) {
            WC wc = new WC();
            wc.word = word;
            wc.frequency = frequency;
            return wc;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }

}

 

  輸出的結果為,和我們想的結果是一樣的。

WC world 1
WC hello 2
WC hqs 1

  2)接下來的例子會複雜一些,從一個txt檔案中讀取資料,txt檔案中包含id, 人字, 書名,價格資訊。然後將資料註冊成一個表,然後將這個表的結果進行統計,按人名統計出來這個人買書所花費的錢,將結果sink到一個檔案中。上程式碼。

  

package myflink.sql;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;

public class SQLFromFile {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

        env.setParallelism(1);
        //讀取檔案
        DataSource<String> input = env.readTextFile("test.txt");
        //將讀取到的檔案進行輸出
        input.print();
        //轉換為DataSet
        DataSet<Orders> inputDataSet = input.map(new MapFunction<String, Orders>() {
            @Override
            public Orders map(String s) throws Exception {
                String[] splits = s.split(" ");
                return Orders.of(Integer.valueOf(splits[0]), String.valueOf(splits[1]), String.valueOf(splits[2]), Double.valueOf(splits[3]));
            }
        });
        //轉換為table
        Table order = tableEnv.fromDataSet(inputDataSet);
        //註冊Orders表名
        tableEnv.registerTable("Orders", order);
        Table nameResult = tableEnv.scan("Orders").select("name");
        //輸出一下表
        nameResult.printSchema();

        //執行一下查詢
        Table sqlQueryResult = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");
        //查詢結果轉換為DataSet
        DataSet<Result> result = tableEnv.toDataSet(sqlQueryResult, Result.class);
        result.print();

        //以tuple的方式進行輸出
        result.map(new MapFunction<Result, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> map(Result result) throws Exception {
                String name = result.name;
                Double total = result.total;
                return Tuple2.of(name, total);
            }
        }).print();

        TableSink sink  = new CsvTableSink("SQLText.txt", " | ");

        //設定欄位名
        String[] filedNames = {"name", "total"};
        //設定欄位型別
        TypeInformation[] filedTypes = {Types.STRING(), Types.DOUBLE()};

        tableEnv.registerTableSink("SQLTEXT", filedNames, filedTypes, sink);

        sqlQueryResult.insertInto("SQLTEXT");

        env.execute();

    }

    public static class Orders {
        public Integer id;
        public String name;
        public String book;
        public Double price;

        public Orders() {
            super();
        }

        public static Orders of(Integer id, String name, String book, Double price) {
            Orders orders = new Orders();
            orders.id = id;
            orders.name = name;
            orders.book = book;
            orders.price = price;
            return orders;
        }
    }

    public static class Result {
        public String name;
        public Double total;

        public Result() {
            super();
        }

        public static Result of(String name, Double total) {
            Result result = new Result();
            result.name = name;
            result.total = total;
            return result;
        }
    }

}

  

  想參考完整的程式碼,可以訪問 https://github.com/stonehqs/flink-demo 。

 

  有問題,歡迎拍磚。