1. 程式人生 > 資料庫 >基於 Flink SQL CDC 的實時資料同步方案

基於 Flink SQL CDC 的實時資料同步方案

Flink 1.11 引入了 Flink SQL CDC,CDC 能給我們資料和業務間能帶來什麼變化?本文由 Apache Flink PMC,阿里巴巴技術專家伍翀 (雲邪)分享,內容將從傳統的資料同步方案,基於 Flink CDC 同步的解決方案以及更多的應用場景和 CDC 未來開發規劃等方面進行介紹和演示。

 

 

  1. 傳統資料同步方案

  2. 基於 Flink SQL CDC 的資料同步方案(Demo)

  3. Flink SQL CDC 的更多應用場景

  4. Flink SQL CDC 的未來規劃

 

傳統的資料同步方案與 

Flink SQL CDC 解決方案

 

 

業務系統經常會遇到需要更新資料到多個儲存的需求。例如:一個訂單系統剛剛開始只需要寫入資料庫即可完成業務使用。某天 BI 團隊期望對資料庫做全文索引,於是我們同時要寫多一份資料到 ES 中,改造後一段時間,又有需求需要寫入到 Redis 快取中。

          

 

很明顯這種模式是不可持續發展的,這種雙寫到各個資料儲存系統中可能導致不可維護和擴充套件,資料一致性問題等,需要引入分散式事務,成本和複雜度也隨之增加。我們可以通過 CDC(Change Data Capture)工具進行解除耦合,同步到下游需要同步的儲存系統。通過這種方式提高系統的穩健性,也方便後續的維護。

 

               

 

Flink SQL CDC 資料同步與原理解析

 

 

CDC 全稱是 Change Data Capture ,它是一個比較廣義的概念,只要能捕獲變更的資料,我們都可以稱為 CDC 。業界主要有基於查詢的 CDC 和基於日誌的 CDC ,可以從下面表格對比他們功能和差異點。

 

 

 

基於查詢的 CDC

基於日誌的 CDC

概念

每次捕獲變更發起 Select 查詢進行全表掃描,過濾出查詢之間變更的資料

讀取資料儲存系統的 log ,例如 MySQL 裡面的 binlog持續監控

開源產品

Sqoop, Kafka JDBC Source

Canal, Maxwell, Debezium

執行模式

Batch

Streaming

捕獲所有資料的變化

低延遲,不增加資料庫負載

不侵入業務(LastUpdated欄位)

 

 

捕獲刪除事件和舊記錄的狀態

捕獲舊記錄的狀態

 

 

經過以上對比,我們可以發現基於日誌 CDC 有以下這幾種優勢:

 

 

  • 能夠捕獲所有資料的變化,捕獲完整的變更記錄。在異地容災,資料備份等場景中得到廣泛應用,如果是基於查詢的 CDC 有可能導致兩次查詢的中間一部分資料丟失

  • 每次 DML 操作均有記錄無需像查詢 CDC 這樣發起全表掃描進行過濾,擁有更高的效率和效能,具有低延遲,不增加資料庫負載的優勢

  • 無需入侵業務,業務解耦,無需更改業務模型

  • 捕獲刪除事件和捕獲舊記錄的狀態,在查詢 CDC 中,週期的查詢無法感知中間資料是否刪除

        

 

基於日誌的 CDC 方案介紹

 

從 ETL 的角度進行分析,一般採集的都是業務庫資料,這裡使用 MySQL 作為需要採集的資料庫,通過 Debezium 把 MySQL Binlog 進行採集後傳送至 Kafka 訊息佇列,然後對接一些實時計算引擎或者 APP 進行消費後把資料傳輸入 OLAP 系統或者其他儲存介質。

 

Flink 希望打通更多資料來源,發揮完整的計算能力。我們生產中主要來源於業務日誌和資料庫日誌,Flink 在業務日誌的支援上已經非常完善,但是在資料庫日誌支援方面在 Flink 1.11 前還屬於一片空白,這就是為什麼要整合 CDC 的原因之一。

 

Flink SQL 內部支援了完整的 changelog 機制,所以 Flink 對接 CDC 資料只需要把CDC 資料轉換成 Flink 認識的資料,所以在 Flink 1.11 裡面重構了 TableSource 介面,以便更好支援和整合 CDC。

 

                        

 

重構後的 TableSource 輸出的都是 RowData 資料結構,代表了一行的資料。在RowData 上面會有一個元資料的資訊,我們稱為 RowKind 。RowKind 裡面包括了插入、更新前、更新後、刪除,這樣和資料庫裡面的 binlog 概念十分類似。通過 Debezium 採集的 JSON 格式,包含了舊資料和新資料行以及原資料資訊,op 的 u表示是 update 更新操作識別符號,ts_ms 表示同步的時間戳。因此,對接 Debezium JSON 的資料,其實就是將這種原始的 JSON 資料轉換成 Flink 認識的 RowData。

 

 

 

選擇 Flink 作為 ETL 工具

 

 

當選擇 Flink 作為 ETL 工具時,在資料同步場景,如下圖同步結構:

 

             

 

通過 Debezium 訂閱業務庫 MySQL 的 Binlog 傳輸至 Kafka ,Flink 通過建立 Kafka 表指定 format 格式為 debezium-json ,然後通過 Flink 進行計算後或者直接插入到其他外部資料儲存系統,例如圖中的 Elasticsearch 和 PostgreSQL。

 

             

 

但是這個架構有個缺點,我們可以看到採集端元件過多導致維護繁雜,這時候就會想是否可以用 Flink SQL 直接對接 MySQL 的 binlog 資料呢,有沒可以替代的方案呢?

 

答案是有的!經過改進後結構如下圖:

 

             

 

社群開發了 flink-cdc-connectors 元件,這是一個可以直接從 MySQL、PostgreSQL 等資料庫直接讀取全量資料和增量變更資料的 source 元件。目前也已開源,開源地址:

 

 

https://github.com/ververica/flink-cdc-connectors

 

 

flink-cdc-connectors 可以用來替換 Debezium+Kafka 的資料採集模組,從而實現 Flink SQL 採集+計算+傳輸(ETL)一體化,這樣做的優點有以下:

 

 

  • 開箱即用,簡單易上手

  • 減少維護的元件,簡化實時鏈路,減輕部署成本

  • 減小端到端延遲

  • Flink 自身支援 Exactly Once 的讀取和計算

  • 資料不落地,減少儲存成本

  • 支援全量和增量流式讀取

  • binlog 採集位點可回溯*

 

 

基於 Flink SQL CDC 的

資料同步方案實踐

 

 

 

下面給大家帶來 3 個關於 Flink SQL + CDC 在實際場景中使用較多的案例。在完成實驗時候,你需要 Docker、MySQL、Elasticsearch 等元件,具體請參考每個案例參考文件。

 

 

案例 1 :  Flink SQL CDC  + JDBC Connector

 

 

這個案例通過訂閱我們訂單表(事實表)資料,通過 Debezium 將 MySQL Binlog 傳送至 Kafka,通過維表 Join 和 ETL 操作把結果輸出至下游的 PG 資料庫。具體可以參考 Flink 公眾號文章:《》案例進行實踐操作。

 

 

https://www.bilibili.com/video/BV1bp4y1q78d

             

 

案例 2 :  CDC Streaming ETL

 

 

模擬電商公司的訂單表和物流表,需要對訂單資料進行統計分析,對於不同的資訊需要進行關聯後續形成訂單的大寬表後,交給下游的業務方使用 ES 做資料分析,這個案例演示瞭如何只依賴 Flink 不依賴其他元件,藉助 Flink 強大的計算能力實時把 Binlog 的資料流關聯一次並同步至 ES 。

 

          

 

例如如下的這段 Flink SQL 程式碼就能完成實時同步 MySQL 中 orders 表的全量+增量資料的目的。

CREATE TABLE orders (  order_id INT,  order_date TIMESTAMP(0),  customer_name STRING,  price DECIMAL(10, 5),  product_id INT,  order_status BOOLEAN) WITH (  'connector' = 'mysql-cdc',  'hostname' = 'localhost',  'port' = '3306',  'username' = 'root',  'password' = '123456',  'database-name' = 'mydb',  'table-name' = 'orders');
SELECT * FROM orders

 

 

 

為了讓讀者更好地上手和理解,我們還提供了 docker-compose 的測試環境,更詳細的案例教程請參考下文的視訊連結和文件連結。

 

 

視訊連結:

https://www.bilibili.com/video/BV1zt4y1D7kt

文件教程:

https://github.com/ververica/flink-cdc-connectors/wiki/中文教程

  

 

案例 3 : Streaming Changes to Kafka

 

 

下面案例就是對 GMV 進行天級別的全站統計。包含插入/更新/刪除,只有付款的訂單才能計算進入 GMV ,觀察 GMV 值的變化。

 

              

視訊連結:

https://www.bilibili.com/video/BV1zt4y1D7kt

文件教程:

https://github.com/ververica/flink-cdc-connectors/wiki/中文教程

 

 

Flink SQL CDC 的更多應用場景

 

Flink SQL CDC 不僅可以靈活地應用於實時資料同步場景中,還可以打通更多的場景提供給使用者選擇。

 

 

 

Flink 在資料同步場景中的靈活定位

 

  • 如果你已經有 Debezium/Canal + Kafka 的採集層 (E),可以使用 Flink 作為計算層 (T) 和傳輸層 (L)

  • 也可以用 Flink 替代 Debezium/Canal ,由 Flink 直接同步變更資料到 Kafka,Flink 統一 ETL 流程

  • 如果不需要 Kafka 資料快取,可以由 Flink 直接同步變更資料到目的地,Flink 統一 ETL 流程

 

 

Flink SQL CDC : 打通更多場景

 

  • 實時資料同步,資料備份,資料遷移,數倉構建

    優勢:豐富的上下游(E & L),強大的計算(T),易用的 API(SQL),流式計算低延遲

  • 資料庫之上的實時物化檢視、流式資料分析

  • 索引構建和實時維護

  • 業務 cache 重新整理

  • 審計跟蹤

  • 微服務的解耦,讀寫分離

  • 基於 CDC 的維表關聯

 

 

下面介紹一下為何用 CDC 的維表關聯會比基於查詢的維表查詢快。

 

■ 基於查詢的維表關聯

 

             

 

目前維表查詢的方式主要是通過 Join 的方式,資料從訊息佇列進來後通過向資料庫發起 IO 的請求,由資料庫把結果返回後合併再輸出到下游,但是這個過程無可避免的產生了 IO 和網路通訊的消耗,導致吞吐量無法進一步提升,就算使用一些快取機制,但是因為快取更新不及時可能會導致精確性也沒那麼高。

 

■ 基於 CDC 的維表關聯

 

             

 

我們可以通過 CDC 把維表的資料匯入到維表 Join 的狀態裡面,在這個 State 裡面因為它是一個分散式的 State ,裡面儲存了 Database 裡面實時的資料庫維表映象,當訊息佇列資料過來時候無需再次查詢遠端的資料庫了,直接查詢本地磁碟的 State ,避免了 IO 操作,實現了低延遲、高吞吐,更精準。

 

Tips:目前此功能在 1.12 版本的規劃中,具體進度請關注 FLIP-132 。

 

 

 

未來規劃

 

  • FLIP-132 :Temporal Table DDL(基於 CDC 的維表關聯)

  • Upsert 資料輸出到 Kafka

  • 更多的 CDC formats 支援(debezium-avro, OGG, Maxwell)

  • 批模式支援處理 CDC 資料

  • flink-cdc-connectors 支援更多資料庫

 

 

總結

 

本文通過對比傳統的資料同步方案與 Flink SQL CDC 方案分享了 Flink CDC 的優勢,與此同時介紹了 CDC 分為日誌型和查詢型各自的實現原理。後續案例也演示了關於 Debezium 訂閱 MySQL Binlog 的場景介紹,以及如何通過 flink-cdc-connectors 實現技術整合替代訂閱元件。除此之外,還詳細講解了 Flink CDC 在資料同步、物化檢視、多機房備份等的場景,並重點講解了社群未來規劃的基於 CDC 維表關聯對比傳統維表關聯的優勢以及 CDC 元件工作。

 

希望通過這次分享,大家對 Flink SQL CDC 能有全新的認識和了解,在未來實際生產開發中,期望 Flink CDC 能帶來更多開發的便捷和更豐富的使用場景。

 

 

 

Q & A

 

1、GROUP BY 結果如何寫到 Kafka ?

 

因為 group by 的結果是一個更新的結果,目前無法寫入 append only 的訊息佇列中裡面去。更新的結果寫入 Kafka 中將在 1.12 版本中原生地支援。在 1.11 版本中,可以通過 flink-cdc-connectors 專案提供的 changelog-json format 來實現該功能,具體見文件。

 

文件連結:

https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

 

2、CDC 是否需要保證順序化消費?

 

 

是的,資料同步到 kafka ,首先需要 kafka 在分割槽中保證有序,同一個 key 的變更資料需要打入到同一個 kafka 的分割槽裡面。這樣 flink 讀取的時候才能保證順序。