1. 程式人生 > 其它 >Tapdata 肖貝貝:實時資料引擎系列(六)-從 PostgreSQL 實時資料整合看增量資料快取層的必要性

Tapdata 肖貝貝:實時資料引擎系列(六)-從 PostgreSQL 實時資料整合看增量資料快取層的必要性

 
摘要:對於 PostgreSQL 的實時資料採集, 業界經常遇到了包括:對源庫效能/儲存影響較大, 採集效能受限, 時間回退重新同步不支援, 資料型別較複雜等等問題。Tapdata 在解決 PostgreSQL 增量複製問題過程中,獲得了一些不錯的經驗和思考,本文將分享 Tapdata 自研的 TAP-CDC-CACHE,和其他幾種市面常見的解決方案的優勢和特性。

前言

TAPDATA 的資料複製產品裡, 提供了對於 PostgreSQL 的實時資料採集功能, 在客戶落地使用時, 遇到了包括 對源庫效能/儲存影響較大, 採集效能受限, 時間回退重新同步不支援, 資料型別較複雜 等等問題, 在解決這些問題的過程中, 我們逐漸對增量事件應該具備一個快取中介軟體有了清晰的認識, 並在之後的時間裡做了相應的實現   本文從我們在解決 PostgreSQL 增量複製的問題出發, 在一步步尋找解決方案的過程中, 分享一下我們最終解決方案的過程和對這個問題的一些思考

PG 增量資料捕獲的幾種常見方案

萬變不離其宗, PostgreSQL 捕獲增量事件的原理與 Mysql, MongoDB 等資料庫類似, 其本質都是基於事務日誌進行回放, 這種日誌在 PG 裡被稱為 Write-Ahead Logging(WAL), 通過對 WAL 的解析, 可以得到資料庫的邏輯事件變更, 下游的各種消費者可以在這個基礎上完成資料複製, 流計算等等各種需求   在具體的實現上, 通常有以下三種技術選型

基於複製槽的解碼與查詢

針對開發者進行資料邏輯複製的需求, PostgreSQL 開放了對於 WAL 的訂閱介面, 開發者需要建立一個名為複製槽的結構, 並指定其解碼外掛, 之後只需要輪詢這個複製槽, 即可獲取最新的以事務為最小粒度的資料變更   常見的解碼器有 decoderbufs, wal2json, pgoutput
等等, TAPDATA 支援的外掛, 其對應的資料庫版本與特點如下:   除此之外, 還有一些其他外掛, 比如: decoding-json, decoder_json, jsoncdc, wal2mongo, postgres-decoderbufs, Bottled Water, osm-logical, pglogical, transicator 等等各式各樣的輸出格式, 使用者可以按照自己的需求選擇合適的外掛, 也可以自己開發對應的解碼器     以 wal2json 為例, 具體的使用命令如下:  
## 建立一個 slot, 命名為 tapdata, 用來接收 CDC 事件, 並使用 wal2json 解析
select * from pg_create_logical_replication_slot('tapdata', 'wal2json'); ## 檢視 slot 基本資訊 select * from pg_replication_slots where slot_name='tapdata'; ## 從 slot 讀取資料, 並清理讀過的資料 ## 方法支援的引數的為: ## 1. slot 名字, 必選 ## 2. 一個 lsn 位置, 必選, 讀取到這個位置為止, 剩下的此次查詢不返回 ## 3. 一個 limit 數字 n, 必選, 最多讀取 n 條為止, 剩下的此次查詢不返回, 與 lsn 滿足任意一條即停止讀取 ## 4. options, 可選, 控制一些輸出的資料內容, 具體可以檢視: https://pgpedia.info/p/pg_logical_slot_get_changes.html select * from pg_logical_slot_get_changes('tapdata', NULL, NULL) ## 從 slot 讀取資料, 保留讀過的資料, 引數與 pg_logical_slot_get_changes 完全一致 select * from pg_logical_slot_peek_changes('tapdata', NULL, NULL) ## select 支援使用 xid, lsn 等條件進行過濾, 比如限制返回的條目數為 10, 並且 lsn > '1/47CB8450', 可如下寫 select * from pg_logical_slot_peek_changes('tapdata', NULL, NULL) where lsn > '1/47CB8450' limit 10 ## 由於 pg_logical_slot_peek_changes 不清理資料, 在需要清理 lsn 時, 可以使用 pg_replication_slot_advance ## 將 lsn 推進到指定位置, 並清理之前的記錄 select * from pg_replication_slot_advance('tapdata', '1/47CB8450')
  這個方案的優勢是使用便捷, 建立複製槽後, 可以方便使用 SQL 查詢增量資料 方案的問題有很多, 我們遇到的列舉在下面:
  1. 虛擬 CDC 表不包含任何索引, 使用 where 條件查詢效能很糟糕
  2. 使用 pg_logical_slot_get_changes 會清除已經讀取的資料, 無法實現多工的資料複用, 只能建立多個互不關聯的 slot 支援下游使用
  3. slot 數量受資料庫配置限制, 無法動態調整
  4. 遺忘的 slot 會持續膨脹, 佔用資料庫儲存資源
  5. slot 不支援過濾, 繁忙的資料庫上資料量巨大, 即使在下游進行邏輯過濾, 其佔用的頻寬也難以避免
  6. 只可以在 主節點 使用, 在發生主從切換時, 機制會失效
  7. 不支援 DDL(結構變更, 比如表字段增加) 事件捕獲, 只支援 DML(資料增刪改) 事件捕獲
  8. 不支援無唯一標記的 DML 事件捕獲, 唯一標記可以是主鍵, 也可以是唯一索引
  9. 需要源庫日誌開啟到 logic 級別, 增大了儲存佔用
  10. 不支援回溯獲取歷史資料變更, 只能獲取到開啟 slot 之後的變更
  即便問題如此之多, 但是由於其使用的便捷性, 對其進行二次開發的成本很低, 依然成為各大資料整合元件裡的首選方案, 這其中包括 debezium, flink-cdc, datax, flinkx 等等

手動管理日誌解析

為了解決這些問題, 我們需要能直接解析 WAL 的外掛方案 Oracle 資料庫有一個叫做 Logminer 的外掛, 可以方便對資料庫 Redo Log 進行邏輯解析, 對 PostgreSQL 也有一個類似的外掛叫 Walminer, 專案地址在: movead/WalMiner   在使用上, 與手動管理的 Oracle Logminer 基本一致, 其具體的使用命令如下:
## 列出 WAL 檔案
select walminer_wal_list()

## 新增 WAL 檔案或者 WAL 檔案目錄到待解析
select walminer_wal_add('/opt/test/wal')

## 解析日誌
select walminer_all()

## 解析指定時間的 WAL 日誌
select walminer_by_time(starttime, endtime)

## 解析指定 lsn 範圍的 WAL 日誌
select walminer_by_lsn(startlsn, endlsn)

## 檢視解析結果
select * from walminer_contents

## 銷燬解析任務
select walminer_stop()
  與基於複製槽的解碼方案相比, Walminer 有自己的一些優勢, 包括:
  1. 可以解析任意時間段的日誌, 不需要提前開啟任務
  2. 不需要將日誌級別設定為 logic, 節省空間
  3. 支援 DML/DDL 事件解析
  4. 可以對結果表建立索引, 進行基於時間和斷點的範圍查詢
  他的劣勢有:
  1. 結果表佔用了資料庫儲存資源
  2. 日誌解析佔用了資料庫計算資源
  3. 事件查詢佔用了資料庫計算與頻寬資源
  4. 不支援併發解析, 使用者需要自己進行細粒度資料管理
  相比複製槽解碼外掛, Walminer 從根本上解決了很多問題, 並引導我們思考這個方案的通用擴充套件性

原生裸日誌解析

pgwal_dump 是 PostgreSQL 官方提供的 WAL 解析工具, 與 Walminer 相比, 其優勢在於不需要安裝到資料庫中, 且解析不佔用資料庫資源, 解析後的內容可以輸出到檔案中供下游消費, 官方提供, 有較好的維護性, 其劣勢在於無法使用資料庫驅動進行任務管理, 需要額外安裝通訊 agent 進行任務管理, 且其輸出結果無法直接 SQL 查詢, 需要自行組織結果資料   除此之外, 其核心功能與 Walminer 基本相同, 可作為備用方案使用

WAL 日誌方案的反思

對資料庫的設計者來說, 提供資料庫事件的回放能力往往基於兩個目的:
  1. 故障恢復
  2. 主從同步
  故障恢復的場景使用低頻, 資料實時性要求低, 多手動操作, 對整合性要求不高, pgwal_dump 是一個典型的例子, 對這個工具的整合使用需要額外開發 agent 進行任務管理, 增加了使用成本   主從同步有一個典型的特點是從的數量往往不是很多, 因此所有基於此假設的方案在遇到較多的消費下游時, 會遇到比較嚴重的效能問題, slot 的方案即是如此, 除此之外, 主從同步往往需要全量資料保持一致, 因此往往不會針對庫, 表, 甚至更細緻的查詢條件進行特異性解析優化, 在使用時往往帶來較大的資源浪費   實時資料服務平臺的需求打破了上述兩個目的假設, 其場景既需要非常高的實時性, 又需要非常好的整合性, 同時對資料的消費數量與業務相關, 繁忙的資料庫其消費場景會達到數十, 甚至數百個, 這些資料消費任務對資料的要求各不相同, 具備精細的過濾條件   在實時任務的開發過程中, 將時間回退到某個時間點進行回放是非常常見的除錯需求, 已有的方案要麼無法實現, 要麼以佔用較多的資料庫資源進行折衷, 在技術上不優雅   針對各種資料庫, 以上的困難都不止一次出現在我們面前, 客戶在進行任務開發時, 需要小心翼翼設計任務過程, 避免對生產庫造成影響, 對使用者造成了較大的心智負擔   痛定思痛, 作為專注在實時資料開發的產品型公司, 這個問題被客戶反覆提起, 擺在研發團隊面前, 經過多次思考與嘗試, 我們使用了自研快取中介軟體, 提出了自己的解決方案

TAP-CDC-CACHE

在軟體開發領域有一個名言, "All problems in computer science can be solved by another level of indirection", 這個場景也不例外   為了解決這個問題, TAPDATA 對於各種來源的資料增量事件的寫入和消費需求, 針對性開發了一個高速大容量的快取層, 其具備以下基本特性:
  1. 分散式高可用: 基於 RAFT 的多副本同步機制, 可防止單點故障
  2. 無外部服務依賴: 部署便捷, 管理方便
  3. 豐富的儲存端資料過濾: 支援多欄位, 多級欄位, 欄位等於, 欄位範圍, IN Array, 多條件邏輯運算等過濾條件, 執行在服務端, 極大節省頻寬和消費端算力
  4. 支援多生產者/消費者, 支援自動推進, ACK 推進等消費方式
  5. 高效能: 極致資料吞吐能力, 單節點可滿足每秒數百萬的事件讀寫能力
  6. 大容量: 基於普通磁碟讀寫能力進行設計, 支援資料壓縮, 滿足常見業務場景極長時間的歷史增量事件儲存需求
  7. 嚴格順序保證: 針對同一個資料來源的資料, 不使用分割槽儲存, 保證資料的嚴格有序性, 雖然降低了部分處理效能, 但是對流計算場景來講, 資料的準確性比效能更為重要
  並針對 CDC 場景進行額外優化, 包括:
  1. 增量事件自動解析: 支援常見資料庫事務日誌格式, 原生寫入, 自動解析並規整輸出
  2. 事件補全: 基於全量資料 1:1 拷貝, 支援將部分不完整的增量事件, 比如沒有開啟 Full 的 Oracle Redo Log, MongoDB Oplog 缺少前值與完整後值的情況, 對資料進行自動補全, 方便下游進行各種計算處理
  3. 事件共享: 對一個確定的資料來源例項, 只需要對源庫進行一份增量事件讀取, 下游所有消費者從快取層獲取資料, 避免對源庫造成較大壓力
  4. 支援時間和斷點位置的雙向轉換: 通過大範圍二級索引查詢與精確查詢遍歷相結合的方式, 轉換速度快, 資源消耗少
  5. 統一資料標準檢測: 對 DML/DDL 描述抽象出一套異構資料庫通用的描述, 包括統一可擴充套件的資料型別, 事件標準描述等規則, 並支援在快取層進行檢測, 保證進入下游的資料符合質量要求
  6. 支援指定範圍的 全量+增量 自動合併結果返回, 在批流一體的精確一次資料輸出場景, 可以做到對源庫的無鎖併發資料讀取, 並極大簡化了聯結器的開發過程
  這個中介軟體工作在資料採集層與計算層的中間位置, 遮蔽了資料庫增量標準的差異性, 解決了之前方案遇到的各種問題, 為後續對資料的使用提供了足夠的功能與效能空間, 為產品提供了獨有的競爭力   幾個常見的工作模式流程圖如下:

典型工作模式

以 Oracle 為例, 開發者只需要將單併發例項級別無過濾的 Logminer Redolog 解析結果傳送到快取層, 後續的標準化, 有序性保證, 過濾器均可自動完成, 如下圖所示  

非標準日誌補齊

以 MongoDB 為例, MongoDB 的 Update 需要開啟反查才能獲取完整前值, Delete 操作不支援變更前值獲取, 在流計算場景, 只有一個變更主鍵是不滿足後續資料需求的, 比如對雙流 JOIN 場景, JOIN 鍵不為主鍵時, 一條記錄的刪除除了需要知曉主鍵之外, 他的關聯鍵和具體變更的資料也非常重要   針對這個場景, TAP-CDC-CACHE 的工作模式如下:  

全量增量資料合併

在進行包含全量 + 增量的計算場景時, 通常情況下為保證資料的精確一次性, 需要提供源端鎖表 + 快照讀實現, 鎖錶帶來了業務損失, 快照讀對於繁忙的資料庫負載很高, 為了解決這些問題, 基於源表無鎖範圍併發讀取 + 部分增量合併操作的 CDC 相比之下成為更優良的方案選擇   但是在具體實現上, 開發者的負擔更重, 為解決這個問題, TAP-CDC-CACHE 將複雜邏輯抽象在中介軟體中, 開發者只需要簡單將全量/增量資料, 按照不同的生產者灌入到儲存中, 後續的全部操作均由中介軟體完成, 其工作模式如下:  

後言

提到資料流儲存, 會有一些同學有為什麼不使用 kafka, pulsar, 或者 pravega 這種產品的疑問, 處於解決問題成本最低的考慮, 一開始確實有考慮使用流儲存, 與 Stream API 去開發一些處理運算元來實現需求, 但是流儲存這些開發介面, 本質上是對流做逐條變換, 一些核心的需求, 比如:
  1. 對不完整事件進行補全
  2. 合併增量全量資料
  3. 時間/斷點相互轉換等問題
  這幾個問題的技術抽象使用逐條讀取流已經很勉強, 實現出來的效果並不好, 我們不得不對一些特定的流做一些二級索引的維護, 這本身又需要單獨一個元件來做, 這引入了一些額外的複雜性, 再考慮到:
  1. 過濾器是非常消耗頻寬的操作, 而常見的流儲存產品不支援在 broker 進行計算
  2. 針對場景需求, 我們需要開發較多的 Stream 中介軟體
  我們認識到自己的需求可以被更優雅和專業地解決, 於是有了這個產品的雛形,本質上來講, TAP-CDC-CACHE 是一個特定場景下優化的資料庫。

關於 Tapdata:

Tapdata 是一款基於資料即服務(DaaS)架構理念,面向 OLTP 業務或場景的實時資料服務平臺,具備異構資料實時同步、批流一體資料融合、自助式 API 釋出等功能。Tapdata 目前已支援近百個資料來源和型別,包括市場主流的資料庫,API,佇列,物聯網等,所有操作均是低程式碼、視覺化方式,無需專業的程式設計能力就可完成資料實時同步、資料對映與合併、資料建模、資料服務 API 開發,資料實時入湖入倉等。申請試用:https://tapdata.net/tapdata-enterprise/demo.html   本文作者:Tapdata 技術合夥人肖貝貝,原文地址https://tapdata.net/TAP-CDC-CACHE.html