基於Apache Hudi和Debezium構建CDC入湖管道
從 Hudi v0.10.0 開始,我們很高興地宣佈推出適用於 Deltastreamer 的 Debezium 源,它提供從 Postgres 和 MySQL 資料庫到資料湖的變更捕獲資料 (CDC) 的攝取。有關詳細資訊請參閱原始 RFC
1. 背景
當想要對來自事務資料庫(如 Postgres 或 MySQL)的資料執行分析時,通常需要通過稱為更改資料捕獲 CDC的過程將此資料引入資料倉庫或資料湖等 OLAP 系統。 Debezium 是一種流行的工具,它使 CDC 變得簡單,其提供了一種通過讀取更改日誌來捕獲資料庫中行級更改的方法,通過這種方式 Debezium 可以避免增加資料庫上的 CPU 負載,並確保捕獲包括刪除在內的所有變更。
現在
2. 總體設計
上面顯示了使用 Apache Hudi 的端到端 CDC 攝取流的架構,第一個元件是 Debezium 部署,它由 Kafka 叢集、schema registry(Confluent 或 Apicurio)和 Debezium 聯結器組成,Debezium 聯結器不斷輪詢資料庫中的更改日誌,並將每個資料庫行的更改寫入 AVRO 訊息到每個表的專用 Kafka 主題。
第二個元件是
為了近乎實時地將資料庫表中的資料提取到 Hudi 表中,我們實現了兩個可插拔的 Deltastreamer 類。首先我們實現了一個 Debezium 源。 Deltastreamer 在連續模式下執行,源源不斷地從給定表的 Kafka 主題中讀取和處理 Avro 格式的 Debezium 更改記錄,並將更新的記錄寫入目標 Hudi 表。 除了資料庫表中的列之外,我們還攝取了一些由 Debezium 新增到目標 Hudi 表中的元欄位,元欄位幫助我們正確地合併更新和刪除記錄,使用
其次我們實現了一個自定義的 Debezium Payload,它控制了在更新或刪除同一行時如何合併 Hudi 記錄,當接收到現有行的新 Hudi 記錄時,有效負載使用相應列的較高值(MySQL 中的 FILEID 和 POS 欄位以及 Postgres 中的 LSN 欄位)選擇最新記錄,在後一個事件是刪除記錄的情況下,有效負載實現確保從儲存中硬刪除記錄。 刪除記錄使用 op 欄位標識,該欄位的值 d 表示刪除。
3. Apache Hudi配置
在使用 Debezium 源聯結器進行 CDC 攝取時,請務必考慮以下 Hudi 部署配置。
- 記錄鍵 - 表的 Hudi 記錄鍵應設定為上游資料庫中表的主鍵。這可確保正確應用更新,因為記錄鍵唯一地標識 Hudi 表中的一行。
- 源排序欄位 - 對於更改日誌記錄的重複資料刪除,源排序欄位應設定為資料庫上發生的更改事件的實際位置。 例如我們分別使用 MySQL 中的 FILEID 和 POS 欄位以及 Postgres 資料庫中的 LSN 欄位來確保記錄在原始資料庫中以正確的出現順序進行處理。
- 分割槽欄位 - 不要將 Hudi 表的分割槽與與上游資料庫相同的分割槽欄位相匹配。當然也可以根據需要為 Hudi 表單獨設定分割槽欄位。
3.1 引導現有表
一個重要的用例可能是必須對現有資料庫表進行 CDC 攝取。在流式傳輸更改之前我們可以通過兩種方式獲取現有資料庫資料:
- 預設情況下,Debezium 在初始化時執行資料庫的初始一致快照(由 config snapshot.mode 控制)。在初始快照之後它會繼續從正確的位置流式傳輸更新以避免資料丟失。
- 雖然第一種方法很簡單,但對於大型表,Debezium 引導初始快照可能需要很長時間。或者我們可以執行 Deltastreamer 作業,使用 JDBC 源直接從資料庫引導表,這為使用者定義和執行引導資料庫表所需的更優化的 SQL 查詢提供了更大的靈活性。引導作業成功完成後,將執行另一個 Deltastreamer 作業,處理來自 Debezium 的資料庫更改日誌,使用者必須在 Deltastreamer 中使用檢查點來確保第二個作業從正確的位置開始處理變更日誌,以避免資料丟失。
3.2 例子
以下描述了使用 AWS RDS 例項 Postgres、基於 Kubernetes 的 Debezium 部署和在 Spark 叢集上執行的 Hudi Deltastreamer 實施端到端 CDC 管道的步驟。
3.3 資料庫
RDS 例項需要進行一些配置更改才能啟用邏輯複製。
SET rds.logical_replication to 1 (instead of 0)
psql --host=<aws_rds_instance> --port=5432 --username=postgres --password -d <database_name>;
CREATE PUBLICATION <publication_name> FOR TABLE schema1.table1, schema1.table2;
ALTER TABLE schema1.table1 REPLICA IDENTITY FULL;
3.4 Debezium 聯結器
Strimzi 是在 Kubernetes 叢集上部署和管理 Kafka 聯結器的推薦選項,或者可以選擇使用 Confluent 託管的 Debezium 聯結器。
kubectl create namespace kafka
kubectl create -f https://strimzi.io/install/latest?namespace=kafka -n kafka
kubectl -n kafka apply -f kafka-connector.yaml
kafka-connector.yaml 的示例如下所示:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-kafka-connect
annotations:
strimzi.io/use-connector-resources: "false"
spec:
image: debezium-kafka-connect:latest
replicas: 1
bootstrapServers: localhost:9092
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
可以使用以下包含 Postgres Debezium 聯結器的 Dockerfile 構建 docker 映像 debezium-kafka-connect
FROM confluentinc/cp-kafka-connect:6.2.0 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.0
FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
RUN yum -y update
RUN yum -y install git
RUN yum -y install wget
RUN wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.1.Final/debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN tar xzf debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN mkdir -p /opt/kafka/plugins/debezium && mkdir -p /opt/kafka/plugins/avro/
RUN mv debezium-connector-postgres /opt/kafka/plugins/debezium/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/
USER 1001
一旦部署了 Strimzi 運算子和 Kafka 聯結器,我們就可以啟動 Debezium 聯結器。
curl -X POST -H "Content-Type:application/json" -d @connect-source.json http://localhost:8083/connectors/
以下是設定 Debezium 聯結器以生成兩個表 table1 和 table2 的更改日誌的配置示例。
connect-source.json 的內容如下
{
"name": "postgres-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "database",
"plugin.name": "pgoutput",
"database.server.name": "postgres",
"table.include.list": "schema1.table1,schema1.table2",
"publication.autocreate.mode": "filtered",
"tombstones.on.delete":"false",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<schema_registry_host>",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<schema_registry_host>",
"slot.name": "pgslot"
}
}
3.5 Hudi Deltastreamer
接下來我們使用 Spark 執行 Hudi Deltastreamer,它將從 kafka 攝取 Debezium 變更日誌並將它們寫入 Hudi 表。 下面顯示了一個這樣的命令例項,它適用於 Postgres 資料庫。 幾個關鍵配置如下:
- 將源類設定為 PostgresDebeziumSource。
- 將有效負載類設定為 PostgresDebeziumAvroPayload。
- 為 Debezium Source 和 Kafka Source 配置模式登錄檔 URL。
- 將記錄鍵設定為資料庫表的主鍵。
- 將源排序欄位 (dedup) 設定為 _event_lsn
spark-submit \\
--jars "/home/hadoop/hudi-utilities-bundle_2.12-0.10.0.jar,/usr/lib/spark/external/lib/spark-avro.jar" \\
--master yarn --deploy-mode client \\
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.0-SNAPSHOT.jar \\
--table-type COPY_ON_WRITE --op UPSERT \\
--target-base-path s3://bucket_name/path/for/hudi_table1 \\
--target-table hudi_table1 --continuous \\
--min-sync-interval-seconds 60 \\
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \\
--source-ordering-field _event_lsn \\
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \\
--hoodie-conf schema.registry.url=https://localhost:8081 \\
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://localhost:8081/subjects/postgres.schema1.table1-value/versions/latest \\
--hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \\
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=postgres.schema1.table1 \\
--hoodie-conf auto.offset.reset=earliest \\
--hoodie-conf hoodie.datasource.write.recordkey.field=”database_primary_key” \\
--hoodie-conf hoodie.datasource.write.partitionpath.field=partition_key \\
--enable-hive-sync \\
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \\
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \\
--hoodie-conf hoodie.datasource.hive_sync.database=default \\
--hoodie-conf hoodie.datasource.hive_sync.table=hudi_table1 \\
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=partition_key
4. 總結
這篇文章介紹了用於 Hudi Deltastreamer 的 Debezium 源,以將 Debezium 更改日誌提取到 Hudi 表中。 現在可以將資料庫資料提取到資料湖中,以提供一種經濟高效的方式來儲存和分析資料庫資料。
請關注此 JIRA 以瞭解有關此新功能的更多資訊。