1. 程式人生 > 其它 >Hudi-Flink CDC將MySQL資料寫入hudi

Hudi-Flink CDC將MySQL資料寫入hudi

CDC概念

CDC全稱是Change data Cpature,即變更資料捕獲,主要面向資料庫的變更,是資料庫領域非常常見的技術,主要用於捕獲資料庫的一些變更,然後可以把變更資料傳送到下游。    

CDC型別

1.基於查詢的,客戶端會通過SQL方式查詢源庫表變更資料,然後對外發送。 2.基於日誌的,這也是業界廣泛使用的一種方式,一般是通過binlog方式,變更的記錄會寫入binlog,解析binlog後會寫入訊息系統,或直接基於Flink CDC進行處理。    

CDC資料入湖

基於CDC資料的入湖,架構:上游各種各樣的資料來源,比如DB的變更資料、事件流,以及各種外部資料來源,都可以通過變更流的方式寫入表中,再進行外部的查詢分析   典型CDC入湖的鏈路: 鏈路1是大部分公司採取的鏈路,前面CDC的資料先通過CDC工具匯入kafka或者Pulsar,再通過Flink或者是spark流式消費寫到Hudi裡 鏈路2是通過Flink CDC直聯到MySQL上游資料來源,直接寫到下游hudi表。  

Flink CDC Hudi概述

基於Flink CDC技術,實時採集MySQL資料庫表資料,進行ETL轉換處理,最終儲存Hudi表      

實踐

MySQL資料庫建立表,實時新增資料,通過Flink CDC將資料寫入Hudi表,並且Hudi與Hive整合,自動在hive中建立表與新增分割槽資訊,最後hive終端beeline查詢分析資料。   hudi表與hive表自動關聯整合,需要重新編譯hudi原始碼,指定hive版本及編譯時包含hive依賴jar包   1.MySQL資料庫,建立表及開啟binlog 2.建立flink CDC表,關聯到MySQL資料庫表 3.建立檢視,資料來源輸入表,欄位與輸出表相同 4.建立輸出表,關聯到hudi表,自動同步到hive中,欄位與hudi表相同 5.查詢檢視資料,插入到輸出表(hudi表) 6.查詢hive表資料,ro型別(讀優化查詢)和rt型別(快照查詢)  

準備工作

1.編譯hudi原始碼

修改hudi整合flink和hive編譯依賴版本配置 原因:現在版本hudi,在編譯的時候後本身預設已經集成了flink-SQL-connector-hive的包,會和flink lib包下的flink-SQL-connector-hive衝突。所以,編譯的過程中只修改hive編譯版本 檔案: hudi-0.10.1/packaging/hudi-flink-bundle/pom.xml,將hive.version改為2.3.9 <include>org.apache.flink:flink-sql-connector-hive-2.3.9_${scala.binary.version}</include> 該問題從 0.10 版本已經解決 mvn clean install -DskipTests -Drat.skip=true -Dscala-2.12 -Dspark3 -Dhive.version=2.3.9 -Pflink-bundle-shade-hive2   編譯完成後,有2個jar包,非常重要 hudi-flink-bundle_2.12-0.10.1.jar位於hudi-0.10.1/packaging/hudi-flink-bundle/target,flink用來寫入和讀取資料,將其拷貝至$FLINK_HOME/lib目錄中,如果以前有同名jar包,先刪除再拷貝。 hudi-hadoop-mr-bundle-0.10.1.jar位於hudi-0.10.1/packaging/hudi-hadoop-mr-bundle/target,hive需要用來讀hudi資料,將其拷貝至$HIVE_HOME/lib目錄中。  

2.將Flink CDC MySQL對應jar包,放到$FLINK_HOME/lib目錄中

https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc/1.4.0 flink-sql-connector-mysql-cdc-1.4.0.jar  

實現

零、元件版本

hudi:0.10.1
flink:1.13.1
hive:2.3.9
Hadoop:2.8.0
Scala:2.12.4

一、MySQL配置

1.MySQL資料庫,建立表及開啟binlog vim /etc/my.cnf 在[mysqld]下面新增內容
server-id=2
log-bin=mysql-bin
binlog_format=rot
expire_logs_day=15
binlog_row_image=full
  2.重啟MySQL
service mysqld restart
檢視是否生效
show master logs
  3.建表
create database test_hudi;
create table test_hudi.tbl_users(
    id bigint auto_increment primary key,
    name varchar(20) null,
    birthday timestamp default CURRENT_TIMESTAMP NOT NULL,
    ts timestamp default CURRENT_TIMESTAMP NOT NULL
);
 
INSERT INTO test_hudi.tbl_users(name) VALUES(“測試”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“張三”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“李四”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“王五”);
INSERT INTO test_hudi.tbl_users(name) VALUES(“趙六");
 

二、建立Flink CDC表,關聯到MySQL資料庫表

1.啟動相關服務
HDFS
    hadoop-daemon.sh start namenode
    hadoop-daemon.sh start datanode
Hive
     ./hive --service metastore &
   ./hive --service hiveserver2  &
Flink standalone
    start-cluster.sh
啟動Flink SQL Client客戶端
    sql-client.sh embedded -j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/flink-connector-mysql-cdc-1.4.0.jar shell
-j /Users/FengZhen/Desktop/Hadoop/flink/flink-1.13.1/lib/hudi-hive-sync-bundle-0.10.1.jar
    設定屬性
    set  sql-client.execution.result-mode=tableau;
    set execution.checkpointing.interval=3sec;
  2.建立輸入表,關聯MySQL表,採用MySQL CDC關聯
CREATE TABLE users_source_mysql(
    id BIGINT PRIMARY KEY NOT ENFORCED,
    name STRING,
    birthday TIMESTAMP(3),
    ts TIMESTAMP(3)
)WITH(
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '1234qwer',
    'server-time-zone'= 'Asia/Shanghai',
    'debezium.snapshot.mode' = 'initial',
    'database-name' = 'test_hudi',
    'table-name' = 'tbl_users'
);
Flink SQL> select * from users_source_mysql;
+----+----------------------+--------------------------------+-------------------------+-------------------------+
| op |                   id |                           name |                birthday |                      ts |
+----+----------------------+--------------------------------+-------------------------+-------------------------+
| +I |                    1 |                           測試 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 |
 
 

三、建立檢視,查詢輸入表,欄位與輸出表相同

建立檢視,增加分割槽列part,方便後續同步hive分割槽表
CREATE VIEW view_users_cdc AS
SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') AS part FROM users_source_mysql;
Flink SQL> select * From view_users_cdc;
+----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+
| op |                   id |                           name |                birthday |                      ts |                           part |
+----+----------------------+--------------------------------+-------------------------+-------------------------+--------------------------------+
| +I |                    1 |                           測試 | 2022-03-15 22:09:00.000 | 2022-03-15 22:09:00.000 |                       20220315 |
| +I |                    2 |                           張三 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |
| +I |                    3 |                           李四 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |
| +I |                    4 |                           王五 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |
| +I |                    5 |                           趙六 | 2022-03-17 13:26:05.000 | 2022-03-17 13:26:05.000 |                       20220317 |
 

四、建立CDC Hudi Sink 表,並自動同步hive分割槽

 
CREATE TABLE users_sink_hudi_hive(
   id bigint ,
   name string,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3),
   part VARCHAR(20),
   primary key(id) not enforced
)
PARTITIONED BY (part)
with(
    'connector'='hudi',
    'path'= 'hdfs://localhost:9000/hudi-warehouse/users_sink_hudi_hive'
    , 'hoodie.datasource.write.recordkey.field'= 'id'-- 主鍵
    , 'write.precombine.field'= 'ts'-- 自動precombine的欄位
    , 'write.tasks'= '1'
    , 'compaction.tasks'= '1'
    , 'write.rate.limit'= '2000'-- 限速
    , 'table.type'= 'MERGE_ON_READ'-- 預設COPY_ON_WRITE,可選MERGE_ON_READ
    , 'compaction.async.enabled'= 'true'-- 是否開啟非同步壓縮
    , 'compaction.trigger.strategy'= 'num_commits'-- 按次數壓縮
    , 'compaction.delta_commits'= '1'-- 預設為5
    , 'changelog.enabled'= 'true'-- 開啟changelog變更
    , 'read.streaming.enabled'= 'true'-- 開啟流讀
    , 'read.streaming.check-interval'= '3'-- 檢查間隔,預設60s
    , 'hive_sync.enable'= 'true'-- 開啟自動同步hive
    , 'hive_sync.mode'= 'hms'-- 自動同步hive模式,預設jdbc模式, hms:hive metastore
    , 'hive_sync.metastore.uris'= 'thrift://localhost:9083'-- hive metastore地址
    -- , 'hive_sync.jdbc_url'= 'jdbc:hive2://localhost:10000'-- hiveServer地址
    , 'hive_sync.table'= 'users_sink_hudi_hive_sync'-- hive 新建表名
    , 'hive_sync.db'= 'db_hudi'-- hive 新建資料庫名
    , 'hive_sync.username'= ''-- HMS 使用者名稱
    , 'hive_sync.password'= ''-- HMS 密碼
    , 'hive_sync.support_timestamp'= 'true'-- 相容hive timestamp型別
);
 

五、檢視資料寫入hudi表

INSERT INTO users_sink_hudi_hive
SELECT id,name,birthday,ts,part FROM view_users_cdc;
 

六、Hive表查詢

需要將hudi-hadoop-mr-bundle-0.10.1.jar包,放到$HIVE_HOME/lib下 啟動beeline客戶端,連線hiveserver2
beeline -u jdbc:hive2://localhost:10000 -n root -p 123456
  我吐了,我的hive沒生成下邊這兩張表,我各種操作都TM不行,這個情況先保留,以後再看   會自動生成hudi MOR模式的兩張表 users_sink_hudi_hive_ro:ro表全稱read optimized table,對於MOR表同步的xxx_ro表,只暴露壓縮後的parquet。其查詢方式和COW表類似。設定完hiveInputformat之後和普通的hive表一樣查詢即可。 users_sink_hudi_rt:rt表示增量檢視,主要針對增量查詢的rt表;ro表只能查parquet檔案資料,rt表parquet檔案資料和log檔案資料都可查。   檢視hive表資料
set hive.exec.mode.local.auto = true;
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hive.mapred.mode = nonstrict;