[離線計算-Spark|Hive] 資料近實時同步數倉方案設計
背景
最近閱讀了大量關於hudi相關文章, 下面結合對Hudi的調研, 設計一套技術方案用於支援 MySQL資料CDC同步至數倉中,避免繁瑣的ETL流程,藉助Hudi的upsert, delete 能力,來縮短資料的交付時間.
元件版本:
- Hadoop 2.6.0
- Hive 1.1.0
- hudi 0.7.0
- spark 2.4.6
架構設計
- 使用canal(阿里巴巴MySQL Binlog增量訂閱&消費元件)dump mysql binlog 資料
- 採集後將binlog 資料採集到kafka中, 按照庫名建立topic, 並按照表名將資料寫入topic 固定分割槽
- spark 消費資料將資料生成DF
- 將DF資料寫入hudi表
- 同步hudi元資料到hive中
寫入主要分成兩部分全量資料和增量資料:
-
歷史資料通過bulkinsert 方式 同步寫入hudi
-
增量資料直接消費寫入使用hudi的upsert能力,完成資料合併
寫入hudi在hdfs的格式如下:
hudi
hudi 如何處理binlog upsert,delete 事件進行資料的合併?
upsert好理解, 依賴本身的能力.
針對mysql binlog的delete 事件,使用記錄級別刪除:
-
需要在資料中新增 '_HOODIE_IS_DELETED' 且值為true的列
-
需要在dataFrame中新增此列,如果此值為false或者不存在則當作常規寫入記錄
如果此值為true則為刪除記錄
示例程式碼如下:
StructField(_HOODIE_IS_DELETED, DataTypes.BooleanType, true, Metadata.empty());
dataFrame.write.format("org.apache.hudi") .option("hoodie.table.name", "test123") .option("hoodie.datasource.write.operation", "upsert") .option("hoodie.datasource.write.recordkey.field", "uuid") .option("hoodie.datasource.write.partitionpath.field", "partitionpath") .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") .option("hoodie.datasource.write.precombine.field", "ts") .mode(Append) .save(basePath)
寫入hudi及同步資料至hive,需要注意的事情和如何處理?
-
宣告為hudi表的path路徑, 非分割槽表 使用tablename/, 分割槽表根據分割槽路徑層次定義/個數
-
在建立表時需新增 TBLPROPERTIES 'spark.sql.sources.provider'='hudi' 宣告為datasource為hudi型別的表
hudi如何處理新增欄位?
當使用Spark查詢Hudi資料集時,當資料的schema新增時,會獲取單個分割槽的parquet檔案來推匯出schema,若變更schema後未更新該分割槽資料,那麼新增的列是不會顯示,否則會顯示該新增的列;若未更新該分割槽的記錄時,那麼新增的列也不會顯示,可通過 mergeSchema來控制合併不同分割槽下parquet檔案的schema,從而可達到顯示新增列的目的
hudi 寫入時指定mergeSchema引數 為true
spark如何實現hudi表資料的寫入和讀取?
Spark支援使用者自定義的format來讀取或寫入檔案,只需要實現對應的(RelationProvider、SchemaRelationProvider)等介面即可。而Hudi也自定義實現了 org.apache.hudi/ hudi來實現Spark對Hudi資料集的讀寫,Hudi中最重要的一個相關類為 DefaultSource,其實現了 CreatableRelationProvider#createRelation介面,並實現了讀寫邏輯
kyuubi
如何讀取hudi表資料?
使用網易開源的kyuubi
kyuubi架構圖:
支援HiveServer2 Thrift API協議,可以通過beeline 連線
hive: beeline -u jdbc:hive2://ip:10000 -n userName -p
kyuubi: beeline -u jdbc:hive2://ip:8333 -n userName -p
hudi 元資料使用hive metastore
spark來識別載入hudi表
實現hudi表與hive表關聯查詢
kyuubi 支援SparkContext的動態快取,讓使用者不需要每次查詢都動態建立SparkContext。作為一個應用在yarn 上一直執行,終止beeline 連線後,應用仍在執行,下次登入,使用SQL可以直接查詢
總結
本文主要針對hudi進行調研, 設計MySQL CDC 近實時同步至數倉中方案, 寫入主要利用hudi的upsert以及delete能力. 針對hudi 表的查詢,引入kyuubi 框架,除 了增強平臺 spark sql作為即席查詢服務的能力外,同時支援查詢hudi表,並可以實現hudi表與hive表的聯合查詢, 同時對原有hive相關服務沒有太大影響.
參考
- https://blog.csdn.net/weixin_38166318/article/details/111825032
- https://blog.csdn.net/qq_37933018/article/details/120864648
- https://cxymm.net/article/qq_37933018/120864648
- https://www.jianshu.com/p/a271524adcc3
- https://jishuin.proginn.com/p/763bfbd65b70