1. 程式人生 > 其它 >37 手遊基於 Flink CDC + Hudi 湖倉一體方案實踐

37 手遊基於 Flink CDC + Hudi 湖倉一體方案實踐

簡介:介紹了 37 手遊為何選擇 Flink 作為計算引擎,並如何基於 Flink CDC + Hudi 構建新的湖倉一體方案。

本文作者是 37 手遊大資料開發徐潤柏,介紹了 37 手遊為何選擇 Flink 作為計算引擎,並如何基於 Flink CDC + Hudi 構建新的湖倉一體方案,主要內容包括:

  1. Flink CDC 基本知識介紹
  2. Hudi 基本知識介紹
  3. 37 手遊的業務痛點和技術方案選型
  4. 37 手遊湖倉一體介紹
  5. Flink CDC + Hudi 實踐
  6. 總結

一、Flink-CDC 2.0

Flink CDC Connectors是 Apache Flink 的一個 source 端的聯結器,目前 2.0 版本支援從 MySQL 以及 Postgres 兩種資料來源中獲取資料,2.1 版本社群確定會支援 Oracle,MongoDB 資料來源。

Fink CDC 2.0的核心 feature,主要表現為實現了以下三個非常重要的功能:

  • 全程無鎖,不會對資料庫產生需要加鎖所帶來的風險;
  • 多並行度,全量資料的讀取階段支援水平擴充套件,使億級別的大表可以通過加大並行度來加快讀取速度;
  • 斷點續傳,全量階段支援 checkpoint,即使任務因某種原因退出了,也可通過儲存的 checkpoint 對任務進行恢復實現資料的斷點續傳。
Flink CDC 2.0 詳解核心改進

二、Hudi

Apache Hudi目前被業內描述為圍繞資料庫核心構建的流式資料湖平臺 (Streaming Data Lake Platform)。

由於 Hudi 擁有良好的 Upsert 能力,並且 0.10 Master 對 Flink 版本支援至 1.13.x,因此我們選擇通過 Flink + Hudi 的方式為 37 手遊的業務場景提供分鐘級 Upsert 資料的分析查詢能力。

三、37 手遊的業務痛點和技術方案選型

1. 舊架構與業務痛點

1.1 資料實時性不夠

  • 日誌類資料通過 sqoop 每 30min 同步前 60min 資料到 Hive;
  • 資料庫類資料通過 sqoop 每 60min 同步當天全量資料到 Hive;
  • 資料庫類資料通過 sqoop 每天同步前 60 天資料到 Hive。

1.2 業務程式碼邏輯複雜且難維護

  • 目前 37 手遊還有很多的業務開發沿用 MySQL + PHP 的開發模式,程式碼邏輯複雜且很難維護;
  • 相同的程式碼邏輯,往往流處理需要開發一份程式碼,批處理則需要另開發一份程式碼,不能複用。

1.3 頻繁重刷歷史資料

  • 頻繁地重刷歷史資料來保證資料一致。

1.4 Schema 變更頻繁

  • 由於業務需求,經常需要新增表字段。

1.5 Hive 版本低

  • 目前 Hive 使用版本為 1.x 版本,並且升級版本比較困難;
  • 不支援 Upsert;
  • 不支援行級別的 delete。

由於 37 手遊的業務場景,資料 upsert、delete 是個很常見的需求。所以基於 Hive 數倉的架構對業務需求的滿足度不夠。

2. 技術選型

在同步工具的選型上考慮過 Canal 和 Maxwell。但 Canal 只適合增量資料的同步並且需要部署,維護起來相對較重。而 Maxwell 雖然比較輕量,但與 Canal 一樣需要配合 Kafka 等訊息佇列使用。對比之下,Flink CDC 可以通過配置 Flink connector 的方式基於 Flink-SQL 進行使用,十分輕巧,並且完美契合基於 Flink-SQL 的流批一體架構。

在儲存引擎的選型上,目前最熱門的資料湖產品當屬:Apache Hudi,Apache Iceberg 和 DeltaLake,這些在我們的場景下各有優劣。最終,基於 Hudi 對上下游生態的開放、對全域性索引的支援、對 Flink 1.13 版本的支援,以及對 Hive 版本的相容性 (Iceberg 不支援 Hive1.x 的版本) 等原因,選擇了 Hudi 作為湖倉一體和流批一體的儲存引擎。

針對上述存在的業務痛點以及選型對比,我們的最終方案為:以 Flink1.13.2 作為計算引擎,依靠 Flink 提供的流批統一的 API,基於 Flink-SQL 實現流批一體,Flink-CDC 2.0 作為 ODS 層的資料同步工具以及 Hudi-0.10 Master 作為儲存引擎的湖倉一體,解決維護兩套程式碼的業務痛點。

四、新架構與湖倉一體

37 手遊的湖倉一體方案,是 37 手遊流批一體架構的一部分。通過湖倉一體、流批一體,準實時場景下做到了:資料同源、同計算引擎、同儲存、同計算口徑。資料的時效性可以到分鐘級,能很好的滿足業務準實時數倉的需求。下面是架構圖:

MySQL 資料通過 Flink CDC 進入到 Kafka。之所以資料先入 Kafka 而不是直接入 Hudi,是為了實現多個實時任務複用 MySQL 過來的資料,避免多個任務通過 Flink CDC 接 MySQL 表以及 Binlog,對 MySQL 庫的效能造成影響。

通過 CDC 進入到 Kafka 的資料除了落一份到離線資料倉庫的 ODS 層之外,會同時按照實時資料倉庫的鏈路,從 ODS->DWD->DWS->OLAP 資料庫,最後供報表等資料服務使用。實時數倉的每一層結果資料會準實時的落一份到離線數倉,通過這種方式做到程式一次開發、指標口徑統一,資料統一。

從架構圖上,可以看到有一步資料修正 (重跑歷史資料) 的動作,之所以有這一步是考慮到:有可能存在由於口徑調整或者前一天的實時任務計算結果錯誤,導致重跑歷史資料的情況。

而儲存在 Kafka 的資料有失效時間,不會存太久的歷史資料,重跑很久的歷史資料無法從 Kafka 中獲取歷史源資料。再者,如果把大量的歷史資料再一次推到 Kafka,走實時計算的鏈路來修正歷史資料,可能會影響當天的實時作業。所以針對重跑歷史資料,會通過資料修正這一步來處理。

總體上說,37 手遊的資料倉庫屬於 Lambda 和 Kappa 混搭的架構。流批一體資料倉庫的各個資料鏈路有資料質量校驗的流程。第二天對前一天的資料進行對賬,如果前一天實時計算的資料無異常,則不需要修正資料,Kappa 架構已經足夠。

五、Flink CDC 2.0 + Kafka + Hudi 0.10 實踐

1. 環境準備

  • Flink 1.13.2
  • .../lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然後構建)
  • .../lib/hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException)
  • ../lib/flink-sql-connector-mysql-cdc-2.0.0.jar
  • ../lib/flink-format-changelog-json-2.0.0.jar
  • ../lib/flink-sql-connector-kafka_2.11-1.13.2.jar

source 端 MySQL-CDC 表定義:

create table sy_payment_cdc (
  ID BIGINT,
  ...
  PRIMARY KEY(ID) NOT ENFORCED
) with(
  'connector' = 'mysql-cdc',
  'hostname' = '',
  'port' = '',
  'username' = '',
  'password' = '',
  'database-name' = '',
  'table-name' = '',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '100000',
  'server-id'='5401-5416'
);

值得注意的是:scan.incremental.snapshot.chunk.size 引數需要根據實際情況來配置,如果表資料量不大,使用預設值即可。

Sink 端 Kafka+Hudi COW 表定義:

create table sy_payment_cdc2kafka (
  ID BIGINT,
  ...
  PRIMARY KEY(ID) NOT ENFORCED
) with (
  'connector' = 'kafka',
  'topic' = '',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'key.format' = '',
  'key.fields' = '',
  'format' = 'changelog-json'
);

create table sy_payment2Hudi (
  ID BIGINT,
  ...
  PRIMARY KEY(ID) NOT ENFORCED
)
PARTITIONED BY (YMD)
WITH (
  'connector' = 'Hudi',
  'path' = 'hdfs:///data/Hudi/m37_mpay_tj/sy_payment',
  'table.type' = 'COPY_ON_WRITE',
  'partition.default_name' = 'YMD',
  'write.insert.drop.duplicates' = 'true',
  'write.bulk_insert.shuffle_by_partition' = 'false',
  'write.bulk_insert.sort_by_partition' = 'false',
  'write.precombine.field' = 'MTIME',
  'write.tasks' = '16',
  'write.bucket_assign.tasks' = '16',
  'write.task.max.size' = '',
  'write.merge.max_memory' = ''
);
針對歷史資料入 Hudi,可以選擇離線 bulk_insert 的方式入湖,再通過 Load Index Bootstrap 載入資料後接回增量資料。bulk_insert 方式入湖資料的唯一性依靠源端的資料本身,在接回增量資料時也需要做到保證資料不丟失。

這裡我們選擇更為簡單的調整任務資源的方式將歷史資料入湖。依靠 Flink 的 checkpoint 機制,不管是 CDC 2.0 入 Kafka 期間還是 Kafka 入 Hudi 期間,都可以通過指定 checkpoint 的方式對任務進行重啟並且資料不會丟失。

我們可以在配置 CDC 2.0 入 Kafka,Kafka 入 Hudi 任務時調大記憶體並配置多個並行度,加快歷史資料入湖,等到所有歷史資料入湖後,再相應的調小入湖任務的記憶體配置並且將 CDC 入 Kafka 的並行度設定為 1,因為增量階段 CDC 是單並行度,然後指定 checkpoint 重啟任務。

按照上面表定義的引數配置,配置 16 個並行度,Flink TaskManager 記憶體大小為 50G 的情況下,單表 15 億歷史資料入至 Hudi COW 表實際用時 10 小時,單表 9 億資料入至 Hudi COW 表實際用時 6 小時。當然這個耗時很大一部分是 COW 寫放大的特性,在大資料量的 upsert 模式下耗時較多。

目前我們的叢集由 200 多臺機器組成,線上的流計算任務總數有 200 多,總資料量接近 2PB。

如果叢集資源很有限的情況下,可以根據實際情況調整 Hudi 表以及 Flink 任務的記憶體配置,還可以通過配置 Hudi 的限流引數 write.rate.limit 讓歷史資料緩慢入湖。

之前 Flink CDC 1.x 版本由於全量 snapshot 階段單並行度讀取的原因,當時億級以上的表在全量 snapshot 讀取階段就需要耗費很長時間,並且 checkpoint 會失敗無法保證資料的斷點續傳。

所以當時入 Hudi 是採用先啟動一個 CDC 1.x 的程式將此刻開始的增量資料寫入 Kafka,之後再啟動另外一個 sqoop 程式拉取當前的所有資料至 Hive 後,通過 Flink 讀取 Hive 的資料寫 Hudi,最後再把 Kafka 的增量資料從頭消費接回 Hudi。由於 Kafka 與 Hive 的資料存在交集,因此資料不會丟失,加上 Hudi 的 upsert 能力保證了資料唯一。

但是,這種方式的鏈路太長操作困難,如今通過 CDC 2.0 在全量 snapshot 階段支援多並行度以及 checkpoint 的能力,確實大大降低了架構的複雜度。

2. 資料比對

  • 由於生產環境用的是 Hive1.x,Hudi 對於 1.x 還不支援資料同步,所以通過建立 Hive 外部表的方式進行查詢,如果是 Hive2.x 以上版本,可參考Hive 同步章節;
  • 建立 Hive 外部表 + 預建立分割槽;
  • auxlib 資料夾新增 Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar。
CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(
  `_hoodie_commit_time` string,
  `_hoodie_commit_seqno` string,
  `_hoodie_record_key` string,
  `_hoodie_partition_path` string,
  `_hoodie_file_name` string,
  `ID` bigint,
  ...
  )
PARTITIONED BY (
  `dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.Hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.Hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.Hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs:///data/Hudi/m37_mpay_tj/sy_payment'
最終查詢 Hudi 資料 (Hive 外部表的形式) 與原來 sqoop 同步的 Hive 資料做比對得到:
  1. 總數一致;
  2. 按天分組統計數量一致;
  3. 按天分組統計金額一致。

六、總結

湖倉一體以及流批一體架構對比傳統數倉架構主要有以下幾點好處:

  • Hudi 提供了 Upsert 能力,解決頻繁 Upsert/Delete 的痛點;
  • 提供分鐘級的資料,比傳統數倉有更高的時效性;
  • 基於 Flink-SQL 實現了流批一體,程式碼維護成本低;
  • 資料同源、同計算引擎、同儲存、同計算口徑;
  • 選用 Flink CDC 作為資料同步工具,省掉 sqoop 的維護成本。

最後針對頻繁增加表字段的痛點需求,並且希望後續同步下游系統的時候能夠自動加入這個欄位,目前還沒有完美的解決方案,希望 Flink CDC 社群能在後續的版本提供 Schema Evolution 的支援。

Reference

[1] MySQL CDC 文件:MySQL CDC Connector — Flink CDC 2.0.0 documentation

[2] Hudi Flink 答疑解惑:HUDI FLINK 答疑解惑 · 語雀

[3] Hudi 的一些設計:Hudi 的一些設計 · 語雀

原文連結
本文為阿里雲原創內容,未經允許不得轉載。