FlinkSQL實踐記錄4 -- 實時更新的維表如何join
阿新 • • 發佈:2022-01-25
1. 背景
對於不定期更新的維表,以什麼元件來處理作為FlinkSQL的source表?HBase?Kafka?或mysql?哪一種方案能得到正確結果?
且需要考慮到事實表和維表關聯的時候,是否需要和維錶的歷史版本關聯?還是隻關聯維表的最新版本?
下文以只關聯維表的最新版本為目標進行測試。
2. 實踐過程
2.1 將kafka的compacted topic作為維表
(1) kafka普通主題修改為compacted topic
bin/kafka-topics.sh --alter --topic my_topic_name --zookeeper my_zookeeper:2181 --config cleanup.policy=compact
(2) kafka生產者程式碼
// 建立訊息 DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn"); for (int i = 2; i < 8; i++) { JSONObject json1 = new JSONObject(); json1.put("key", i+""); //json.put("update_time", dtf.format(LocalDateTime.now())); JSONObject json = new JSONObject(); json.put("id", i+""); json.put("name", "name444"+i); ProducerRecord<String, String> record = new ProducerRecord<String, String>( "flinksqldim", json1.toJSONString(), json.toJSONString() ); }
(3) FlinkSQL主體程式碼
// 建立執行環境 //EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); // 把kafka中的topic對映成一個輸入臨時表 tableEnv.executeSql( "CREATE TABLE sensor_source(" + " id STRING, " + " name STRING, " + " o_time TIMESTAMP(3), " + " WATERMARK FOR o_time AS o_time " + " ) WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'flinksqldemo'," + " 'properties.bootstrap.servers' = 'ip:port'," + " 'properties.group.id' = 'flinksqlCount'," + " 'scan.startup.mode' = 'earliest-offset'," + " 'format' = 'json')" ); // 把kafka中資料 對映成輸入維表 - 實時變更的維表 tableEnv.executeSql( "CREATE TABLE dim_source (" + " id STRING," + " name STRING," + " update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " + " WATERMARK FOR update_time AS update_time, " + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'upsert-kafka'," + " 'topic' = 'flinksqldim'," + " 'properties.bootstrap.servers' = 'ip:port'," + " 'properties.group.id' = 'flinksqlDim'," + " 'key.format' = 'json'," + " 'value.format' = 'json')" ); // 把Mysql中的表對映為一個輸出臨時表 String mysql_sql = "CREATE TABLE mysql_sink (" + " name STRING," + " cnt BIGINT," + " PRIMARY KEY (name) NOT ENFORCED" + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," + " 'table-name' = 'count_info'," + " 'username' = 'xxx'," + " 'password' = 'xxx'" + ")"; tableEnv.executeSql(mysql_sql); // 插入資料 TableResult tableResult = tableEnv.executeSql( "INSERT INTO mysql_sink " + "SELECT b.name, count(*) as cnt " + "FROM sensor_source as a " + "INNER JOIN dim_source as b " + "on a.id = b.id " + "where a.id > 3 " + "group by b.name " // "order by name " ); System.out.println(tableResult.getJobClient().get().getJobStatus());
3. 試錯
3.1 使用Regular Joins 常規join
kafka生產者程式碼
for (int i = 1; i < 10; i++) {
//json.put("update_time", dtf.format(LocalDateTime.now()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name555"+i);
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
"flinksqldim2",
i,
json.toJSONString()
);
// 傳送訊息
Future<RecordMetadata> future = producer.send(record);
FlinkSQL處理程式碼
// 建立執行環境
//EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 把kafka中的topic對映成一個輸入臨時表
tableEnv.executeSql(
"CREATE TABLE sensor_source(" +
"id STRING, " +
"name STRING, " +
"o_time TIMESTAMP(3), " +
" WATERMARK FOR o_time AS o_time " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldemo'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlCount'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把kafka中資料 對映成輸入維表 - 實時變更的維表, 非compacted topic
tableEnv.executeSql(
"CREATE TABLE dim_source ( " +
" id STRING, " +
" name STRING, " +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'flinksqldim2'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
// 把Mysql中的表對映為一個輸出臨時表
String mysql_sql = "CREATE TABLE mysql_sink (" +
" name STRING," +
" cnt BIGINT," +
" PRIMARY KEY (name) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://ip:port/kafka?serverTimezone=UTC'," +
" 'table-name' = 'count_info'," +
" 'username' = 'xxx'," +
" 'password' = 'xxx'" +
")";
tableEnv.executeSql(mysql_sql);
// 插入資料
TableResult tableResult = tableEnv.executeSql(
"INSERT INTO mysql_sink " +
"SELECT b.name, count(*) as cnt " +
"FROM sensor_source a " +
"JOIN dim_source b " +
"on a.id = b.id " +
"where a.id > 3 " +
"group by b.name "
);
System.out.println(tableResult.getJobClient().get().getJobStatus());
維表流更新了幾次資料後,結果表count_info中資料錯亂