1. 程式人生 > 實用技巧 >Flink例項(五十八):維表join(二)Flink維表Join實踐

Flink例項(五十八):維表join(二)Flink維表Join實踐

https://blog.csdn.net/chybin500/article/details/106482620/

這個文章是根據【實時數倉篇】基於 Flink 的典型 ETL 場景實現寫的,對視訊中講解的四種維表Join分別實現了一些Demo。

常見的維表Join方式有四種:

  1. 預載入維表
  2. 熱儲存維表
  3. 廣播維表
  4. Temporal table function join

下面分別使用這四種方式來實現一個join的需求,這個需求是:一個主流中資料是使用者資訊,欄位包括使用者姓名、城市id;維表是城市資料,欄位包括城市ID、城市名稱。要求使用者表與城市表關聯,輸出為:使用者名稱稱、城市ID、城市名稱。

使用者表表結構如下:

欄位名資料型別資料樣例
使用者姓名 String User1
城市ID Int 1001
時間戳 Long 1000

城市維表表結構如下:

欄位名資料型別資料樣例
城市ID Int 1001
城市名稱 String beijing
時間戳 Long 1000

1、 預載入維表

通過定義一個類實現RichMapFunction,在open()中讀取維表資料載入到記憶體中,在probe流map()方法中與維表資料進行關聯。
RichMapFunction中open方法里加載維表資料到記憶體的方式特點如下:
優點:實現簡單
缺點:因為資料存於記憶體,所以只適合小資料量並且維表資料更新頻率不高的情況下。雖然可以在open中定義一個定時器定時更新維表,但是還是存在維表更新不及時的情況。
下面是一個例子:

package join;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; /** * Create By 鳴宇淳 on 2020/6/1 * 這個例子是從socket中讀取的流,資料為使用者名稱稱和城市id,維表是城市id、城市名稱, * 主流和維表關聯,得到使用者名稱稱、城市id、城市名稱 * 這個例子採用在RichMapfunction類的open方法中將維表資料載入到記憶體 **/ public class JoinDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是使用者名稱稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1()); result.print(); env.execute("joinDemo1"); } static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> { //定義一個變數,用於儲存維表資料在記憶體 Map<Integer, String> dim; @Override public void open(Configuration parameters) throws Exception { //在open方法中讀取維表資料,可以從資料中讀取、檔案中讀取、介面中讀取等等。 dim = new HashMap<>(); dim.put(1001, "beijing"); dim.put(1002, "shanghai"); dim.put(1003, "wuhan"); dim.put(1004, "changsha"); } @Override public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception { //在map方法中進行主流和維表的關聯 String cityName = ""; if (dim.containsKey(value.f1)) { cityName = dim.get(value.f1); } return new Tuple3<>(value.f0, value.f1, cityName); } } }
2、 熱儲存維表

這種方式是將維表資料儲存在Redis、HBase、MySQL等外部儲存中,實時流在關聯維表資料的時候實時去外部儲存中查詢,這種方式特點如下:
優點:維度資料量不受記憶體限制,可以儲存很大的資料量。
缺點:因為維表資料在外部儲存中,讀取速度受制於外部儲存的讀取速度;另外維表的同步也有延遲。

(1) 使用cache來減輕訪問壓力

可以使用快取來儲存一部分常訪問的維表資料,以減少訪問外部系統的次數,比如使用guava Cache。
下面是一個例子:

package join;

import com.google.common.cache.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * Create By 鳴宇淳 on 2020/6/1
 **/
public class JoinDemo2 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
                .map(p -> {
                    //輸入格式為:user,1000,分別是使用者名稱稱和城市編號
                    String[] list = p.split(",");
                    return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                });

        DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
        result.print();
        env.execute("joinDemo1");
    }

    static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
        LoadingCache<Integer, String> dim;

        @Override
        public void open(Configuration parameters) throws Exception {
            //使用google LoadingCache來進行快取
            dim = CacheBuilder.newBuilder()
                    //最多快取個數,超過了就根據最近最少使用演算法來移除快取
                    .maximumSize(1000)
                    //在更新後的指定時間後就回收
                    .expireAfterWrite(10, TimeUnit.MINUTES)
                    //指定移除通知
                    .removalListener(new RemovalListener<Integer, String>() {
                        @Override
                        public void onRemoval(RemovalNotification<Integer, String> removalNotification) {
                            System.out.println(removalNotification.getKey() + "被移除了,值為:" + removalNotification.getValue());
                        }
                    })
                    .build(
                            //指定載入快取的邏輯
                            new CacheLoader<Integer, String>() {
                                @Override
                                public String load(Integer cityId) throws Exception {
                                    String cityName = readFromHbase(cityId);
                                    return cityName;
                                }
                            }
                    );

        }

        private String readFromHbase(Integer cityId) {
            //讀取hbase
            //這裡寫死,模擬從hbase讀取資料
            Map<Integer, String> temp = new HashMap<>();
            temp.put(1001, "beijing");
            temp.put(1002, "shanghai");
            temp.put(1003, "wuhan");
            temp.put(1004, "changsha");
            String cityName = "";
            if (temp.containsKey(cityId)) {
                cityName = temp.get(cityId);
            }

            return cityName;
        }

        @Override
        public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
            //在map方法中進行主流和維表的關聯
            String cityName = "";
            if (dim.get(value.f1) != null) {
                cityName = dim.get(value.f1);
            }
            return new Tuple3<>(value.f0, value.f1, cityName);
        }
    }
}
(2) 使用非同步IO來提高訪問吞吐量

Flink與外部儲存系統進行讀寫操作的時候可以使用同步方式,也就是傳送一個請求後等待外部系統響應,然後再發送第二個讀寫請求,這樣的方式吞吐量比較低,可以用提高並行度的方式來提高吞吐量,但是並行度多了也就導致了程序數量多了,佔用了大量的資源。
Flink中可以使用非同步IO來讀寫外部系統,這要求外部系統客戶端支援非同步IO,不過目前很多系統都支援非同步IO客戶端。但是如果使用非同步就要涉及到三個問題:
超時:如果查詢超時那麼就認為是讀寫失敗,需要按失敗處理;
併發數量:如果併發數量太多,就要觸發Flink的反壓機制來抑制上游的寫入。
返回順序錯亂:順序錯亂了要根據實際情況來處理,Flink支援兩種方式:允許亂序、保證順序。

下面是一個例項,演示了試用非同步IO來訪問維表:

package join;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Create By 鳴宇淳 on 2020/6/1
 **/
public class JoinDemo3 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
                .map(p -> {
                    //輸入格式為:user,1000,分別是使用者名稱稱和城市編號
                    String[] list = p.split(",");
                    return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                });


        DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream
                //保證順序:非同步返回的結果保證順序,超時時間1秒,最大容量2,超出容量觸發反壓
                .orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
                .setParallelism(1);

        DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream
                //允許亂序:非同步返回的結果允許亂序,超時時間1秒,最大容量2,超出容量觸發反壓
                .unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
                .setParallelism(1);

        orderedResult.print();
        unorderedResult.print();
        env.execute("joinDemo");
    }

    //定義個類,繼承RichAsyncFunction,實現非同步查詢儲存在mysql裡的維表
    //輸入使用者名稱、城市ID,返回 Tuple3<使用者名稱、城市ID,城市名稱>
    static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
        // 連結
        private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false";
        private static String username = "root";
        private static String password = "123";
        private static String driverName = "com.mysql.jdbc.Driver";
        java.sql.Connection conn;
        PreparedStatement ps;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            Class.forName(driverName);
            conn = DriverManager.getConnection(jdbcUrl, username, password);
            ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?");
        }

        @Override
        public void close() throws Exception {
            super.close();
            conn.close();
        }

        //非同步查詢方法
        @Override
        public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
            // 使用 city id 查詢
            ps.setInt(1, input.f1);
            ResultSet rs = ps.executeQuery();
            String cityName = null;
            if (rs.next()) {
                cityName = rs.getString(1);
            }
            List list = new ArrayList<Tuple2<Integer, String>>();
            list.add(new Tuple3<>(input.f0,input.f1, cityName));
            resultFuture.complete(list);
        }

        //超時處理
        @Override
        public void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
            List list = new ArrayList<Tuple2<Integer, String>>();
            list.add(new Tuple3<>(input.f0,input.f1, ""));
            resultFuture.complete(list);
        }
    }
}
3、 廣播維表

利用Flink的Broadcast State將維度資料流廣播到下游做join操作。特點如下:
優點:維度資料變更後可以即時更新到結果中。
缺點:資料儲存在記憶體中,支援的維度資料量比較小。
下面是一個例項:

package join;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Create By 鳴宇淳 on 2020/6/1
 * 這個例子是從socket中讀取的流,資料為使用者名稱稱和城市id,維表是城市id、城市名稱,
 * 主流和維表關聯,得到使用者名稱稱、城市id、城市名稱
 * 這個例子採用 Flink 廣播流的方式來做為維度
 **/
public class JoinDemo4 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //定義主流
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
                .map(p -> {
                    //輸入格式為:user,1000,分別是使用者名稱稱和城市編號
                    String[] list = p.split(",");
                    return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                });
        
        //定義城市流
        DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n")
                .map(p -> {
                    //輸入格式為:城市ID,城市名稱
                    String[] list = p.split(",");
                    return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
                })
                .returns(new TypeHint<Tuple2<Integer, String>>() {
                });

        //將城市流定義為廣播流
        final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.class, String.class);
        BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc);

        DataStream result = textStream.connect(broadcastStream)
                .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {
                    //處理非廣播流,關聯維度
                    @Override
                    public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                        ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc);
                        String cityName = "";
                        if (state.contains(value.f1)) {
                            cityName = state.get(value.f1);
                        }
                        out.collect(new Tuple3<>(value.f0, value.f1, cityName));
                    }

                    @Override
                    public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                        System.out.println("收到廣播資料:" + value);
                        ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1);
                    }
                });


        result.print();
        env.execute("joinDemo");
    }
}
4、 Temporal table function join

Temporal table是持續變化表上某一時刻的檢視,Temporal table function是一個表函式,傳遞一個時間引數,返回Temporal table這一指定時刻的檢視。
可以將維度資料流對映為Temporal table,主流與這個Temporal table進行關聯,可以關聯到某一個版本(歷史上某一個時刻)的維度資料。
Temporal table function join的特點如下:
優點:維度資料量可以很大,維度資料更新及時,不依賴外部儲存,可以關聯不同版本的維度資料。
缺點:只支援在Flink SQL API中使用。

(1) ProcessingTime的一個例項

package join;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;


/**
 * Create By 鳴宇淳 on 2020/6/1
 **/
public class JoinDemo5 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

        //定義主流
        DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
                .map(p -> {
                    //輸入格式為:user,1000,分別是使用者名稱稱和城市編號
                    String[] list = p.split(",");
                    return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                })
                .returns(new TypeHint<Tuple2<String, Integer>>() {
                });

        //定義城市流
        DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n")
                .map(p -> {
                    //輸入格式為:城市ID,城市名稱
                    String[] list = p.split(",");
                    return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
                })
                .returns(new TypeHint<Tuple2<Integer, String>>() {
                });

        //轉變為Table
        Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ps.proctime");
        Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ps.proctime");

        //定義一個TemporalTableFunction
        TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ps", "city_id");
        //登錄檔函式
        tableEnv.registerFunction("dimCity", dimCity);

        //關聯查詢
        Table result = tableEnv
                .sqlQuery("select u.user_name,u.city_id,d.city_name from " + userTable + " as u " +
                        ", Lateral table (dimCity(u.ps)) d " +
                        "where u.city_id=d.city_id");
        
        //列印輸出
        DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
        resultDs.print();
        env.execute("joinDemo");
    }
}

(2) EventTime的一個例項

package join;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

/**
 * Create By 鳴宇淳 on 2020/6/1
 **/
public class JoinDemo9 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //指定是EventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
        env.setParallelism(1);

        //主流,使用者流, 格式為:user_name、city_id、ts
        List<Tuple3<String, Integer, Long>> list1 = new ArrayList<>();
        list1.add(new Tuple3<>("user1", 1001, 1L));
        list1.add(new Tuple3<>("user1", 1001, 10L));
        list1.add(new Tuple3<>("user2", 1002, 2L));
        list1.add(new Tuple3<>("user2", 1002, 15L));
        DataStream<Tuple3<String, Integer, Long>> textStream = env.fromCollection(list1)
                .assignTimestampsAndWatermarks(
                        //指定水位線、時間戳
                        new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.seconds(10)) {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, Long> element) {
                                return element.f2;
                            }
                        }
                );

        //定義城市流,格式為:city_id、city_name、ts
        List<Tuple3<Integer, String, Long>> list2 = new ArrayList<>();
        list2.add(new Tuple3<>(1001, "beijing", 1L));
        list2.add(new Tuple3<>(1001, "beijing2", 10L));
        list2.add(new Tuple3<>(1002, "shanghai", 1L));
        list2.add(new Tuple3<>(1002, "shanghai2", 5L));

        DataStream<Tuple3<Integer, String, Long>> cityStream = env.fromCollection(list2)
                .assignTimestampsAndWatermarks(
                        //指定水位線、時間戳
                        new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, String, Long>>(Time.seconds(10)) {
                            @Override
                            public long extractTimestamp(Tuple3<Integer, String, Long> element) {
                                return element.f2;
                            }
                        });

        //轉變為Table
        Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ts.rowtime");
        Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ts.rowtime");

        tableEnv.createTemporaryView("userTable", userTable);
        tableEnv.createTemporaryView("cityTable", cityTable);

        //定義一個TemporalTableFunction
        TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "city_id");
        //登錄檔函式
        tableEnv.registerFunction("dimCity", dimCity);

        //關聯查詢
        Table result = tableEnv
                .sqlQuery("select u.user_name,u.city_id,d.city_name,u.ts from userTable as u " +
                        ", Lateral table (dimCity(u.ts)) d " +
                        "where u.city_id=d.city_id");

        //列印輸出
        DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
        resultDs.print();
        env.execute("joinDemo");
    }
}

結果輸出為:

user1,1001,beijing,1970-01-01T00:00:00.001
user1,1001,beijing2,1970-01-01T00:00:00.010
user2,1002,shanghai,1970-01-01T00:00:00.002
user2,1002,shanghai2,1970-01-01T00:00:00.015

通過結果可以看到,根據主流中的EventTime的時間,去維表流中取響應時間版本的資料。

(3) Kafka Source的EventTime例項
package join.temporaltablefunctionjoin;

import lombok.Data;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;

import java.io.Serializable;
import java.util.Properties;

/**
 * Create By 鳴宇淳 on 2020/6/1
 **/
public class JoinDemo10 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //指定是EventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
        env.setParallelism(1);

        //Kafka的ip和要消費的topic,//Kafka設定
        String kafkaIPs = "192.168.***.**1:9092,192.168.***.**2:9092,192.168.***.**3:9092";
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", kafkaIPs);
        props.setProperty("group.id", "group.cyb.2");

        //讀取使用者資訊Kafka
        FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(), props);
        userConsumer.setStartFromEarliest();
        userConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserInfo>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(UserInfo userInfo) {
                return userInfo.getTs();
            }
        });

        //讀取城市維度資訊Kafka
        FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
        cityConsumer.setStartFromEarliest();
        cityConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CityInfo>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(CityInfo cityInfo) {
                return cityInfo.getTs();
            }
        });

        //主流,使用者流, 格式為:user_name、city_id、ts
        Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer),"userName,cityId,ts.rowtime" );
        //定義城市維度流,格式為:city_id、city_name、ts
        Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer),"cityId,cityName,ts.rowtime");
        tableEnv.createTemporaryView("userTable", userTable);
        tableEnv.createTemporaryView("cityTable", cityTable);

        //定義一個TemporalTableFunction
        TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "cityId");
        //登錄檔函式
        tableEnv.registerFunction("dimCity", dimCity);

        Table u = tableEnv.sqlQuery("select * from userTable");
        u.printSchema();
        tableEnv.toAppendStream(u, Row.class).print("使用者流接收到:");

        Table c = tableEnv.sqlQuery("select * from cityTable");
        c.printSchema();
        tableEnv.toAppendStream(c, Row.class).print("城市流接收到:");

        //關聯查詢
        Table result = tableEnv
                .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
                        "from userTable as u " +
                        ", Lateral table  (dimCity(u.ts)) d " +
                        "where u.cityId=d.cityId");

        //列印輸出
        DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
        resultDs.print("\t\t關聯輸出:");
        env.execute("joinDemo");
    }
}
package join.temporaltablefunctionjoin;
import java.io.Serializable;

/**
 * Create By 鳴宇淳 on 2020/6/4
 **/
 @Data
public class UserInfo implements Serializable {
    private String userName;
    private Integer cityId;
    private Long ts;
}
package join.temporaltablefunctionjoin;
import java.io.Serializable;

/**
 * Create By 鳴宇淳 on 2020/6/4
 **/
@Data
public class CityInfo implements Serializable {
    private Integer cityId;
    private String cityName;
    private Long ts;
}
package join.temporaltablefunctionjoin;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.serialization.DeserializationSchema;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * Create By 鳴宇淳 on 2020/6/4
 **/
public class UserInfoSchema implements DeserializationSchema<UserInfo> {

    @Override
    public UserInfo deserialize(byte[] message) throws IOException {
        String jsonStr = new String(message, StandardCharsets.UTF_8);
        UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
        return data;
    }

    @Override
    public boolean isEndOfStream(UserInfo nextElement) {
        return false;
    }

    @Override
    public TypeInformation<UserInfo> getProducedType() {
        return TypeInformation.of(new TypeHint<UserInfo>() {
        });
    }
}
package join.temporaltablefunctionjoin;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * Create By 鳴宇淳 on 2020/6/4
 **/
public class CityInfoSchema implements DeserializationSchema<CityInfo> {


    @Override
    public CityInfo deserialize(byte[] message) throws IOException {
        String jsonStr = new String(message, StandardCharsets.UTF_8);
        CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
        return data;
    }

    @Override
    public boolean isEndOfStream(CityInfo nextElement) {
        return false;
    }

    @Override
    public TypeInformation<CityInfo> getProducedType() {
        return TypeInformation.of(new TypeHint<CityInfo>() {
        });
    }
}

依次向user和city兩個topic中寫入資料,
使用者資訊格式:{“userName”:“user1”,“cityId”:1,“ts”:11}
城市維度格式:{“cityId”:1,“cityName”:“nanjing”,“ts”:15}
測試得到的輸出如下:

城市流接收到:> 1,beijing,1970-01-01T00:00
使用者流接收到:> user1,1,1970-01-01T00:00
        關聯輸出:> user1,1,beijing,1970-01-01T00:00
城市流接收到:> 1,shanghai,1970-01-01T00:00:00.005
使用者流接收到:> user1,1,1970-01-01T00:00:00.001
        關聯輸出:> user1,1,beijing,1970-01-01T00:00:00.001
使用者流接收到:> user1,1,1970-01-01T00:00:00.004
        關聯輸出:> user1,1,beijing,1970-01-01T00:00:00.004
使用者流接收到:> user1,1,1970-01-01T00:00:00.005
        關聯輸出:> user1,1,shanghai,1970-01-01T00:00:00.005
使用者流接收到:> user1,1,1970-01-01T00:00:00.007
使用者流接收到:> user1,1,1970-01-01T00:00:00.009
城市流接收到:> 1,shanghai,1970-01-01T00:00:00.007
        關聯輸出:> user1,1,shanghai,1970-01-01T00:00:00.007
城市流接收到:> 1,wuhan,1970-01-01T00:00:00.010
        關聯輸出:> user1,1,shanghai,1970-01-01T00:00:00.009
使用者流接收到:> user1,1,1970-01-01T00:00:00.011
城市流接收到:> 1,nanjing,1970-01-01T00:00:00.015
        關聯輸出:> user1,1,wuhan,1970-01-01T00:00:00.011
5、四種維表關聯方式

預載入到記憶體熱儲存關聯廣播維表Temporal table function jsoin
預載入到記憶體熱儲存關聯廣播維表Temporal table function jsoin
實現複雜度
維表資料量
維表更新頻率
維表更新實時性
維表形式 熱儲存 實時流 實時流
是否依然外部儲存