Flink例項(五十八):維表join(二)Flink維表Join實踐
https://blog.csdn.net/chybin500/article/details/106482620/
這個文章是根據【實時數倉篇】基於 Flink 的典型 ETL 場景實現寫的,對視訊中講解的四種維表Join分別實現了一些Demo。
常見的維表Join方式有四種:
- 預載入維表
- 熱儲存維表
- 廣播維表
- Temporal table function join
下面分別使用這四種方式來實現一個join的需求,這個需求是:一個主流中資料是使用者資訊,欄位包括使用者姓名、城市id;維表是城市資料,欄位包括城市ID、城市名稱。要求使用者表與城市表關聯,輸出為:使用者名稱稱、城市ID、城市名稱。
使用者表表結構如下:
欄位名 | 資料型別 | 資料樣例 |
---|---|---|
使用者姓名 | String | User1 |
城市ID | Int | 1001 |
時間戳 | Long | 1000 |
城市維表表結構如下:
欄位名 | 資料型別 | 資料樣例 |
---|---|---|
城市ID | Int | 1001 |
城市名稱 | String | beijing |
時間戳 | Long | 1000 |
1、 預載入維表
通過定義一個類實現RichMapFunction,在open()中讀取維表資料載入到記憶體中,在probe流map()方法中與維表資料進行關聯。
RichMapFunction中open方法里加載維表資料到記憶體的方式特點如下:
優點:實現簡單
缺點:因為資料存於記憶體,所以只適合小資料量並且維表資料更新頻率不高的情況下。雖然可以在open中定義一個定時器定時更新維表,但是還是存在維表更新不及時的情況。
下面是一個例子:
package join; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; /** * Create By 鳴宇淳 on 2020/6/1 * 這個例子是從socket中讀取的流,資料為使用者名稱稱和城市id,維表是城市id、城市名稱, * 主流和維表關聯,得到使用者名稱稱、城市id、城市名稱 * 這個例子採用在RichMapfunction類的open方法中將維表資料載入到記憶體 **/ public class JoinDemo1 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是使用者名稱稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1()); result.print(); env.execute("joinDemo1"); } static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> { //定義一個變數,用於儲存維表資料在記憶體 Map<Integer, String> dim; @Override public void open(Configuration parameters) throws Exception { //在open方法中讀取維表資料,可以從資料中讀取、檔案中讀取、介面中讀取等等。 dim = new HashMap<>(); dim.put(1001, "beijing"); dim.put(1002, "shanghai"); dim.put(1003, "wuhan"); dim.put(1004, "changsha"); } @Override public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception { //在map方法中進行主流和維表的關聯 String cityName = ""; if (dim.containsKey(value.f1)) { cityName = dim.get(value.f1); } return new Tuple3<>(value.f0, value.f1, cityName); } } }
2、 熱儲存維表
這種方式是將維表資料儲存在Redis、HBase、MySQL等外部儲存中,實時流在關聯維表資料的時候實時去外部儲存中查詢,這種方式特點如下:
優點:維度資料量不受記憶體限制,可以儲存很大的資料量。
缺點:因為維表資料在外部儲存中,讀取速度受制於外部儲存的讀取速度;另外維表的同步也有延遲。
(1) 使用cache來減輕訪問壓力
可以使用快取來儲存一部分常訪問的維表資料,以減少訪問外部系統的次數,比如使用guava Cache。
下面是一個例子:
package join; import com.google.common.cache.*; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; /** * Create By 鳴宇淳 on 2020/6/1 **/ public class JoinDemo2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是使用者名稱稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1()); result.print(); env.execute("joinDemo1"); } static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> { LoadingCache<Integer, String> dim; @Override public void open(Configuration parameters) throws Exception { //使用google LoadingCache來進行快取 dim = CacheBuilder.newBuilder() //最多快取個數,超過了就根據最近最少使用演算法來移除快取 .maximumSize(1000) //在更新後的指定時間後就回收 .expireAfterWrite(10, TimeUnit.MINUTES) //指定移除通知 .removalListener(new RemovalListener<Integer, String>() { @Override public void onRemoval(RemovalNotification<Integer, String> removalNotification) { System.out.println(removalNotification.getKey() + "被移除了,值為:" + removalNotification.getValue()); } }) .build( //指定載入快取的邏輯 new CacheLoader<Integer, String>() { @Override public String load(Integer cityId) throws Exception { String cityName = readFromHbase(cityId); return cityName; } } ); } private String readFromHbase(Integer cityId) { //讀取hbase //這裡寫死,模擬從hbase讀取資料 Map<Integer, String> temp = new HashMap<>(); temp.put(1001, "beijing"); temp.put(1002, "shanghai"); temp.put(1003, "wuhan"); temp.put(1004, "changsha"); String cityName = ""; if (temp.containsKey(cityId)) { cityName = temp.get(cityId); } return cityName; } @Override public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception { //在map方法中進行主流和維表的關聯 String cityName = ""; if (dim.get(value.f1) != null) { cityName = dim.get(value.f1); } return new Tuple3<>(value.f0, value.f1, cityName); } } }
(2) 使用非同步IO來提高訪問吞吐量
Flink與外部儲存系統進行讀寫操作的時候可以使用同步方式,也就是傳送一個請求後等待外部系統響應,然後再發送第二個讀寫請求,這樣的方式吞吐量比較低,可以用提高並行度的方式來提高吞吐量,但是並行度多了也就導致了程序數量多了,佔用了大量的資源。
Flink中可以使用非同步IO來讀寫外部系統,這要求外部系統客戶端支援非同步IO,不過目前很多系統都支援非同步IO客戶端。但是如果使用非同步就要涉及到三個問題:
超時:如果查詢超時那麼就認為是讀寫失敗,需要按失敗處理;
併發數量:如果併發數量太多,就要觸發Flink的反壓機制來抑制上游的寫入。
返回順序錯亂:順序錯亂了要根據實際情況來處理,Flink支援兩種方式:允許亂序、保證順序。
下面是一個例項,演示了試用非同步IO來訪問維表:
package join; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; /** * Create By 鳴宇淳 on 2020/6/1 **/ public class JoinDemo3 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是使用者名稱稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream //保證順序:非同步返回的結果保證順序,超時時間1秒,最大容量2,超出容量觸發反壓 .orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2) .setParallelism(1); DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream //允許亂序:非同步返回的結果允許亂序,超時時間1秒,最大容量2,超出容量觸發反壓 .unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2) .setParallelism(1); orderedResult.print(); unorderedResult.print(); env.execute("joinDemo"); } //定義個類,繼承RichAsyncFunction,實現非同步查詢儲存在mysql裡的維表 //輸入使用者名稱、城市ID,返回 Tuple3<使用者名稱、城市ID,城市名稱> static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> { // 連結 private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false"; private static String username = "root"; private static String password = "123"; private static String driverName = "com.mysql.jdbc.Driver"; java.sql.Connection conn; PreparedStatement ps; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName(driverName); conn = DriverManager.getConnection(jdbcUrl, username, password); ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?"); } @Override public void close() throws Exception { super.close(); conn.close(); } //非同步查詢方法 @Override public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception { // 使用 city id 查詢 ps.setInt(1, input.f1); ResultSet rs = ps.executeQuery(); String cityName = null; if (rs.next()) { cityName = rs.getString(1); } List list = new ArrayList<Tuple2<Integer, String>>(); list.add(new Tuple3<>(input.f0,input.f1, cityName)); resultFuture.complete(list); } //超時處理 @Override public void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception { List list = new ArrayList<Tuple2<Integer, String>>(); list.add(new Tuple3<>(input.f0,input.f1, "")); resultFuture.complete(list); } } }
3、 廣播維表
利用Flink的Broadcast State將維度資料流廣播到下游做join操作。特點如下:
優點:維度資料變更後可以即時更新到結果中。
缺點:資料儲存在記憶體中,支援的維度資料量比較小。
下面是一個例項:
package join; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Create By 鳴宇淳 on 2020/6/1 * 這個例子是從socket中讀取的流,資料為使用者名稱稱和城市id,維表是城市id、城市名稱, * 主流和維表關聯,得到使用者名稱稱、城市id、城市名稱 * 這個例子採用 Flink 廣播流的方式來做為維度 **/ public class JoinDemo4 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //定義主流 DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是使用者名稱稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); //定義城市流 DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n") .map(p -> { //輸入格式為:城市ID,城市名稱 String[] list = p.split(","); return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]); }) .returns(new TypeHint<Tuple2<Integer, String>>() { }); //將城市流定義為廣播流 final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.class, String.class); BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc); DataStream result = textStream.connect(broadcastStream) .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() { //處理非廣播流,關聯維度 @Override public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception { ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc); String cityName = ""; if (state.contains(value.f1)) { cityName = state.get(value.f1); } out.collect(new Tuple3<>(value.f0, value.f1, cityName)); } @Override public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception { System.out.println("收到廣播資料:" + value); ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1); } }); result.print(); env.execute("joinDemo"); } }
4、 Temporal table function join
Temporal table是持續變化表上某一時刻的檢視,Temporal table function是一個表函式,傳遞一個時間引數,返回Temporal table這一指定時刻的檢視。
可以將維度資料流對映為Temporal table,主流與這個Temporal table進行關聯,可以關聯到某一個版本(歷史上某一個時刻)的維度資料。
Temporal table function join的特點如下:
優點:維度資料量可以很大,維度資料更新及時,不依賴外部儲存,可以關聯不同版本的維度資料。
缺點:只支援在Flink SQL API中使用。
(1) ProcessingTime的一個例項
package join; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; /** * Create By 鳴宇淳 on 2020/6/1 **/ public class JoinDemo5 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings); //定義主流 DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n") .map(p -> { //輸入格式為:user,1000,分別是使用者名稱稱和城市編號 String[] list = p.split(","); return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1])); }) .returns(new TypeHint<Tuple2<String, Integer>>() { }); //定義城市流 DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n") .map(p -> { //輸入格式為:城市ID,城市名稱 String[] list = p.split(","); return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]); }) .returns(new TypeHint<Tuple2<Integer, String>>() { }); //轉變為Table Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ps.proctime"); Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ps.proctime"); //定義一個TemporalTableFunction TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ps", "city_id"); //登錄檔函式 tableEnv.registerFunction("dimCity", dimCity); //關聯查詢 Table result = tableEnv .sqlQuery("select u.user_name,u.city_id,d.city_name from " + userTable + " as u " + ", Lateral table (dimCity(u.ps)) d " + "where u.city_id=d.city_id"); //列印輸出 DataStream resultDs = tableEnv.toAppendStream(result, Row.class); resultDs.print(); env.execute("joinDemo"); } }
(2) EventTime的一個例項
package join; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; /** * Create By 鳴宇淳 on 2020/6/1 **/ public class JoinDemo9 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //指定是EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings); env.setParallelism(1); //主流,使用者流, 格式為:user_name、city_id、ts List<Tuple3<String, Integer, Long>> list1 = new ArrayList<>(); list1.add(new Tuple3<>("user1", 1001, 1L)); list1.add(new Tuple3<>("user1", 1001, 10L)); list1.add(new Tuple3<>("user2", 1002, 2L)); list1.add(new Tuple3<>("user2", 1002, 15L)); DataStream<Tuple3<String, Integer, Long>> textStream = env.fromCollection(list1) .assignTimestampsAndWatermarks( //指定水位線、時間戳 new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.seconds(10)) { @Override public long extractTimestamp(Tuple3<String, Integer, Long> element) { return element.f2; } } ); //定義城市流,格式為:city_id、city_name、ts List<Tuple3<Integer, String, Long>> list2 = new ArrayList<>(); list2.add(new Tuple3<>(1001, "beijing", 1L)); list2.add(new Tuple3<>(1001, "beijing2", 10L)); list2.add(new Tuple3<>(1002, "shanghai", 1L)); list2.add(new Tuple3<>(1002, "shanghai2", 5L)); DataStream<Tuple3<Integer, String, Long>> cityStream = env.fromCollection(list2) .assignTimestampsAndWatermarks( //指定水位線、時間戳 new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, String, Long>>(Time.seconds(10)) { @Override public long extractTimestamp(Tuple3<Integer, String, Long> element) { return element.f2; } }); //轉變為Table Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ts.rowtime"); Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ts.rowtime"); tableEnv.createTemporaryView("userTable", userTable); tableEnv.createTemporaryView("cityTable", cityTable); //定義一個TemporalTableFunction TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "city_id"); //登錄檔函式 tableEnv.registerFunction("dimCity", dimCity); //關聯查詢 Table result = tableEnv .sqlQuery("select u.user_name,u.city_id,d.city_name,u.ts from userTable as u " + ", Lateral table (dimCity(u.ts)) d " + "where u.city_id=d.city_id"); //列印輸出 DataStream resultDs = tableEnv.toAppendStream(result, Row.class); resultDs.print(); env.execute("joinDemo"); } }
結果輸出為:
user1,1001,beijing,1970-01-01T00:00:00.001 user1,1001,beijing2,1970-01-01T00:00:00.010 user2,1002,shanghai,1970-01-01T00:00:00.002 user2,1002,shanghai2,1970-01-01T00:00:00.015
通過結果可以看到,根據主流中的EventTime的時間,去維表流中取響應時間版本的資料。
(3) Kafka Source的EventTime例項
package join.temporaltablefunctionjoin; import lombok.Data; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.types.Row; import java.io.Serializable; import java.util.Properties; /** * Create By 鳴宇淳 on 2020/6/1 **/ public class JoinDemo10 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //指定是EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings); env.setParallelism(1); //Kafka的ip和要消費的topic,//Kafka設定 String kafkaIPs = "192.168.***.**1:9092,192.168.***.**2:9092,192.168.***.**3:9092"; Properties props = new Properties(); props.setProperty("bootstrap.servers", kafkaIPs); props.setProperty("group.id", "group.cyb.2"); //讀取使用者資訊Kafka FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(), props); userConsumer.setStartFromEarliest(); userConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserInfo>(Time.seconds(0)) { @Override public long extractTimestamp(UserInfo userInfo) { return userInfo.getTs(); } }); //讀取城市維度資訊Kafka FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props); cityConsumer.setStartFromEarliest(); cityConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CityInfo>(Time.seconds(0)) { @Override public long extractTimestamp(CityInfo cityInfo) { return cityInfo.getTs(); } }); //主流,使用者流, 格式為:user_name、city_id、ts Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer),"userName,cityId,ts.rowtime" ); //定義城市維度流,格式為:city_id、city_name、ts Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer),"cityId,cityName,ts.rowtime"); tableEnv.createTemporaryView("userTable", userTable); tableEnv.createTemporaryView("cityTable", cityTable); //定義一個TemporalTableFunction TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "cityId"); //登錄檔函式 tableEnv.registerFunction("dimCity", dimCity); Table u = tableEnv.sqlQuery("select * from userTable"); u.printSchema(); tableEnv.toAppendStream(u, Row.class).print("使用者流接收到:"); Table c = tableEnv.sqlQuery("select * from cityTable"); c.printSchema(); tableEnv.toAppendStream(c, Row.class).print("城市流接收到:"); //關聯查詢 Table result = tableEnv .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " + "from userTable as u " + ", Lateral table (dimCity(u.ts)) d " + "where u.cityId=d.cityId"); //列印輸出 DataStream resultDs = tableEnv.toAppendStream(result, Row.class); resultDs.print("\t\t關聯輸出:"); env.execute("joinDemo"); } }
package join.temporaltablefunctionjoin; import java.io.Serializable; /** * Create By 鳴宇淳 on 2020/6/4 **/ @Data public class UserInfo implements Serializable { private String userName; private Integer cityId; private Long ts; }
package join.temporaltablefunctionjoin; import java.io.Serializable; /** * Create By 鳴宇淳 on 2020/6/4 **/ @Data public class CityInfo implements Serializable { private Integer cityId; private String cityName; private Long ts; }
package join.temporaltablefunctionjoin; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.serialization.DeserializationSchema; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * Create By 鳴宇淳 on 2020/6/4 **/ public class UserInfoSchema implements DeserializationSchema<UserInfo> { @Override public UserInfo deserialize(byte[] message) throws IOException { String jsonStr = new String(message, StandardCharsets.UTF_8); UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {}); return data; } @Override public boolean isEndOfStream(UserInfo nextElement) { return false; } @Override public TypeInformation<UserInfo> getProducedType() { return TypeInformation.of(new TypeHint<UserInfo>() { }); } }
package join.temporaltablefunctionjoin; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; import java.nio.charset.StandardCharsets; /** * Create By 鳴宇淳 on 2020/6/4 **/ public class CityInfoSchema implements DeserializationSchema<CityInfo> { @Override public CityInfo deserialize(byte[] message) throws IOException { String jsonStr = new String(message, StandardCharsets.UTF_8); CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {}); return data; } @Override public boolean isEndOfStream(CityInfo nextElement) { return false; } @Override public TypeInformation<CityInfo> getProducedType() { return TypeInformation.of(new TypeHint<CityInfo>() { }); } }
依次向user和city兩個topic中寫入資料,
使用者資訊格式:{“userName”:“user1”,“cityId”:1,“ts”:11}
城市維度格式:{“cityId”:1,“cityName”:“nanjing”,“ts”:15}
測試得到的輸出如下:
城市流接收到:> 1,beijing,1970-01-01T00:00 使用者流接收到:> user1,1,1970-01-01T00:00 關聯輸出:> user1,1,beijing,1970-01-01T00:00 城市流接收到:> 1,shanghai,1970-01-01T00:00:00.005 使用者流接收到:> user1,1,1970-01-01T00:00:00.001 關聯輸出:> user1,1,beijing,1970-01-01T00:00:00.001 使用者流接收到:> user1,1,1970-01-01T00:00:00.004 關聯輸出:> user1,1,beijing,1970-01-01T00:00:00.004 使用者流接收到:> user1,1,1970-01-01T00:00:00.005 關聯輸出:> user1,1,shanghai,1970-01-01T00:00:00.005 使用者流接收到:> user1,1,1970-01-01T00:00:00.007 使用者流接收到:> user1,1,1970-01-01T00:00:00.009 城市流接收到:> 1,shanghai,1970-01-01T00:00:00.007 關聯輸出:> user1,1,shanghai,1970-01-01T00:00:00.007 城市流接收到:> 1,wuhan,1970-01-01T00:00:00.010 關聯輸出:> user1,1,shanghai,1970-01-01T00:00:00.009 使用者流接收到:> user1,1,1970-01-01T00:00:00.011 城市流接收到:> 1,nanjing,1970-01-01T00:00:00.015 關聯輸出:> user1,1,wuhan,1970-01-01T00:00:00.011
5、四種維表關聯方式
預載入到記憶體 | 熱儲存關聯 | 廣播維表 | Temporal table function jsoin |
---|
預載入到記憶體 | 熱儲存關聯 | 廣播維表 | Temporal table function jsoin | |
---|---|---|---|---|
實現複雜度 | 低 | 中 | 低 | 低 |
維表資料量 | 低 | 高 | 低 | 高 |
維表更新頻率 | 低 | 中 | 高 | 高 |
維表更新實時性 | 低 | 中 | 高 | 高 |
維表形式 | 熱儲存 | 實時流 | 實時流 | |
是否依然外部儲存 | 低 | 是 | 否 | 否 |