Flink CDC 與Hudi整合
介紹
之前寫過Flink CDC sink 到 Iceberg中,本篇主要實踐如何CDC到hudi中.
什麼是hudi?
Hudi is a rich platform to build streaming data lakes with incremental data pipelines
on a self-managing database layer, while being optimized for lake engines and regular batch processing.
hudi 主要解決什麼問題?
- HDFS的可伸縮性限制
- 需要在Hadoop中更快地呈現資料
- 沒有直接支援對現有資料的更新和刪除
- 快速的ETL和建模
- 要檢索所有更新的記錄,無論這些更新是新增到最近日期分割槽的新記錄還是對舊資料的更新,Hudi都允許使用者使用最後一個檢查點時間戳。此過程不用執行掃描整個源表的查詢
hudi的特性:
- Upserts, Deletes with fast, pluggable indexing.
- Incremental queries, Record level change streams
- Transactions, Rollbacks, Concurrency Control.
- SQL Read/Writes from Spark, Presto, Trino, Hive & more
- Automatic file sizing, data clustering, compactions, cleaning.
- Streaming ingestion, Built-in CDC sources & tools.
- Built-in metadata tracking for scalable storage access.
- Backwards compatible schema evolution and enforcement.
Flink CDC 與 Hudi整合
版本
Flink: 1.13.1
Hudi: 0.10.1
環境搭建
使用本地環境, hadoop 使用之前虛擬機器安裝的環境
MySQL Docker 安裝個映象,主要用於模擬資料變更,產生binlog資料
dockerpull mysql:latest
docker run -itd--name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql
進入容器,可以使用mysql連線驗證:
dockerexec -it 07e946b1fa9a /bin/bash
mysql -uroot -p123456
建立MySQL表:
createtable users
(
id bigint auto_increment primary key,
name varchar(20) null,
birthday timestamp defaultCURRENT_TIMESTAMP not null,
ts timestamp defaultCURRENT_TIMESTAMP not null,
sex int
);
整合程式碼實踐
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.chaplinthink</groupId>
<artifactId>flink-hudi</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.1</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.1</version>
</dependency> <!-- <dependency>--> <!-- <groupId>org.apache.flink</groupId>--> <!-- <artifactId>flink-jdbc_2.12</artifactId>--> <!-- <version>1.10.3</version>--> <!-- </dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.1</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>1.13.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.2.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba.ververica</groupId>-->
<!-- <artifactId>flink-connector-mysql-cdc</artifactId>-->
<!-- <version>1.2.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_2.11</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
</dependencies>
</project>
使用FlinkSQL 建立MySQL資料來源表、Hudi目標表,通過INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users
將資料寫入hudi
核心程式碼:
final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
environment.enableCheckpointing(3000);
final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, fsSettings);
tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 資料來源表
String sourceDDL = "CREATE TABLE mysql_users (\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.70.3',\n" +
" 'port' = '3306', " +
" 'username' = 'aa',\n" +
" 'password' = 'aa', " +
" 'server-time-zone' = 'Asia/Shanghai'," +
" 'database-name' = 'test',\n" +
" 'table-name' = 'users'\n" +
" )";
/**
* 觸發器策略是在完成五次提交後執行壓縮
*/
// 輸出目標表
String sinkDDL = "CREATE TABLE hudi_users2\n" +
"(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3),\n" +
" `partition` VARCHAR(20)\n" +
") PARTITIONED BY (`partition`) WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'table.type' = 'MERGE_ON_READ',\n" +
" 'path' = 'hdfs://ip:8020/hudi/hudi_users2'\n " +
")";
String transformSQL = "INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users\n";
tableEnvironment.executeSql(sourceDDL);
tableEnvironment.executeSql(sinkDDL);
tableEnvironment.executeSql(transformSQL);
environment.execute("mysql-to-hudi");
本地啟動Flink程式
然後進行MySQL DML 操作
insertinto users (name) values ('hello');
insertinto users (name) values ('world');
insertinto users (name) values ('iceberg');
insertinto users (name) values ('hudi');
update users set name = 'hello spark' where id = 4;
delete from users where id = 5;
檢視HDFS上hudi資料路徑:
Hudi 預設情況下,MERGE_ON_READ表的壓縮是啟用的, 觸發器策略是在完成五次提交後執行壓縮. 在MySQL執行insert、update、delete等操作後,就可以用hive/spark-sql/presto進行查詢。
如果沒有生成parquet檔案,我們建的parquet表是查詢不出資料的。
五次提交後可以看到資料檔案:
關掉Flink CDC程式, 單獨寫個FlinkSQL程式讀取HDFS 上hudi資料:
public static void main(String[] args) throwsException {
final EnvironmentSettings fsSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironmentenvironment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
final StreamTableEnvironmenttableEnvironment = StreamTableEnvironment.create(environment, fsSettings);
tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);
String sourceDDL = "CREATE TABLEhudi_users2\n" +
"(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED,\n"+
" name STRING,\n" +
" birthday TIMESTAMP(3),\n" +
" ts TIMESTAMP(3),\n" +
" `partition` VARCHAR(20)\n" +
") PARTITIONED BY(`partition`) WITH (\n" +
" 'connector' = 'hudi',\n" +
" 'table.type' = 'MERGE_ON_READ',\n" +
" 'path' ='hdfs://ip:8020/hudi/hudi_users2',\n" +
" 'read.streaming.enabled' = 'true',\n"+
" 'read.streaming.check-interval' = '1'\n" +
")";
tableEnvironment.executeSql(sourceDDL);
TableResult result2 =tableEnvironment.executeSql("select * from hudi_users2");
result2.print();
environment.execute("read_hudi");
}
FlinkSQL讀取到列印的資料:
與MySQL 資料庫表資料比對可以看到資料是一致的:
至此flink + hudi 湖倉一體化方案的原型就構建完成了.
總結
本篇主要講解Flink CDC與hudi整合實踐, 探索新的湖倉一體架構, 業內37手遊的湖倉一體架構也可供參考如下:
對頻繁增加表字段的痛點需求,同步下游系統的時候希望能夠自動加入這個欄位,目前還沒有完美的解決方案,Flink CDC社群後續看是否提供 Schema Evolution 的支援.
目前MySQL新增欄位,是需要修改Flink程式,然後重啟.