1. 程式人生 > 其它 >[離線計算-Spark|Hive] 資料近實時同步數倉方案設計

[離線計算-Spark|Hive] 資料近實時同步數倉方案設計

本文主要針對hudi進行調研, 設計MySQL CDC 近實時同步至數倉中方案, 寫入主要利用hudi的upsert以及delete能力. 針對hudi 表的查詢,引入kyuubi 框架,除 了增強平臺 spark sql 一些即席查詢服務的能力外,同時支援查詢hudi表,並可以實現hudi表與hive表的聯合查詢, 同時對原有hive相關服務沒有太大影響.

背景

最近閱讀了大量關於hudi相關文章, 下面結合對Hudi的調研, 設計一套技術方案用於支援 MySQL資料CDC同步至數倉中,避免繁瑣的ETL流程,藉助Hudi的upsert, delete 能力,來縮短資料的交付時間.

元件版本:

  • Hadoop 2.6.0
  • Hive 1.1.0
  • hudi 0.7.0
  • spark 2.4.6

架構設計

  1. 使用canal(阿里巴巴MySQL Binlog增量訂閱&消費元件)dump mysql binlog 資料
  2. 採集後將binlog 資料採集到kafka中, 按照庫名建立topic, 並按照表名將資料寫入topic 固定分割槽
  3. spark 消費資料將資料生成DF
  4. 將DF資料寫入hudi表
  5. 同步hudi元資料到hive中

寫入主要分成兩部分全量資料和增量資料:

  • 歷史資料通過bulkinsert 方式 同步寫入hudi

  • 增量資料直接消費寫入使用hudi的upsert能力,完成資料合併

寫入hudi在hdfs的格式如下:

hudi

hudi 如何處理binlog upsert,delete 事件進行資料的合併?

upsert好理解, 依賴本身的能力.

針對mysql binlog的delete 事件,使用記錄級別刪除:

  1. 需要在資料中新增 '_HOODIE_IS_DELETED' 且值為true的列

  2. 需要在dataFrame中新增此列,如果此值為false或者不存在則當作常規寫入記錄

如果此值為true則為刪除記錄

示例程式碼如下:

StructField(_HOODIE_IS_DELETED, DataTypes.BooleanType, true, Metadata.empty());

dataFrame.write.format("org.apache.hudi")
               .option("hoodie.table.name", "test123")
               .option("hoodie.datasource.write.operation", "upsert")
               .option("hoodie.datasource.write.recordkey.field", "uuid")
               .option("hoodie.datasource.write.partitionpath.field", "partitionpath")
               .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE")
               .option("hoodie.datasource.write.precombine.field", "ts")
               .mode(Append)
               .save(basePath)

寫入hudi及同步資料至hive,需要注意的事情和如何處理?

  1. 宣告為hudi表的path路徑, 非分割槽表 使用tablename/, 分割槽表根據分割槽路徑層次定義/個數

  2. 在建立表時需新增 TBLPROPERTIES 'spark.sql.sources.provider'='hudi' 宣告為datasource為hudi型別的表

hudi如何處理新增欄位?

當使用Spark查詢Hudi資料集時,當資料的schema新增時,會獲取單個分割槽的parquet檔案來推匯出schema,若變更schema後未更新該分割槽資料,那麼新增的列是不會顯示,否則會顯示該新增的列;若未更新該分割槽的記錄時,那麼新增的列也不會顯示,可通過 mergeSchema來控制合併不同分割槽下parquet檔案的schema,從而可達到顯示新增列的目的

hudi 寫入時指定mergeSchema引數 為true

spark如何實現hudi表資料的寫入和讀取?

Spark支援使用者自定義的format來讀取或寫入檔案,只需要實現對應的(RelationProvider、SchemaRelationProvider)等介面即可。而Hudi也自定義實現了 org.apache.hudi/ hudi來實現Spark對Hudi資料集的讀寫,Hudi中最重要的一個相關類為 DefaultSource,其實現了 CreatableRelationProvider#createRelation介面,並實現了讀寫邏輯

kyuubi

如何讀取hudi表資料?

使用網易開源的kyuubi

kyuubi架構圖:

支援HiveServer2 Thrift API協議,可以通過beeline 連線

hive: beeline -u jdbc:hive2://ip:10000 -n userName -p 

kyuubi: beeline -u jdbc:hive2://ip:8333 -n userName -p 

hudi 元資料使用hive metastore

spark來識別載入hudi表

實現hudi表與hive表關聯查詢

kyuubi 支援SparkContext的動態快取,讓使用者不需要每次查詢都動態建立SparkContext。作為一個應用在yarn 上一直執行,終止beeline 連線後,應用仍在執行,下次登入,使用SQL可以直接查詢

總結

本文主要針對hudi進行調研, 設計MySQL CDC 近實時同步至數倉中方案, 寫入主要利用hudi的upsert以及delete能力. 針對hudi 表的查詢,引入kyuubi 框架,除 了增強平臺 spark sql作為即席查詢服務的能力外,同時支援查詢hudi表,並可以實現hudi表與hive表的聯合查詢, 同時對原有hive相關服務沒有太大影響.

參考

  1. https://blog.csdn.net/weixin_38166318/article/details/111825032
  2. https://blog.csdn.net/qq_37933018/article/details/120864648
  3. https://cxymm.net/article/qq_37933018/120864648
  4. https://www.jianshu.com/p/a271524adcc3
  5. https://jishuin.proginn.com/p/763bfbd65b70
本文作者: chaplinthink, 關注領域:大資料、基礎架構、系統設計, 一個熱愛學習、分享的大資料工程師