1. 程式人生 > 實用技巧 >網易資料湖探索與實踐-範欣欣

網易資料湖探索與實踐-範欣欣

分享嘉賓:範欣欣網易大資料技術專家

編輯整理:劉閏豐

出品平臺:DataFunTalk

導讀:今天主要和大家交流的是網易在資料湖Iceberg的一些思考與實踐。從網易在資料倉庫建設中遇到的痛點出發,介紹對資料湖Iceberg的探索以及實踐之路。

主要內容包括:

  • 資料倉庫平臺建設的痛點

  • 資料湖Iceberg的核心原理

  • 資料湖Iceberg社群現狀

  • 網易資料湖Iceberg實踐之路

01

資料倉庫平臺建設的痛點

痛點一:

我們凌晨一些大的離線任務經常會因為一些原因出現延遲,這種延遲會導致核心報表的產出時間不穩定,有些時候會產出比較早,但是有時候就可能會產出比較晚,業務很難接受。

為什麼會出現這種現象的發生呢?目前來看大致有這麼幾點要素:

  • 任務本身要請求的資料量會特別大。通常來說一天原始的資料量可能在幾十TB。幾百個分割槽,甚至上千個分割槽,五萬+的檔案數這樣子。如果說全量讀取這些檔案的話,幾百個分割槽就會向NameNode傳送幾百次請求,我們知道離線任務在凌晨執行的時候,NameNode的壓力是非常大的。所以就很有可能出現Namenode響應很慢的情況,如果請求響應很慢就會導致任務初始化時間很長。

  • 任務本身的ETL效率是相對低效的,這個低效並不是說Spark引擎低效,而是說我們的儲存在這塊支援的不是特別的好。比如目前我們查一個分割槽的話是需要將所有檔案都掃描一遍然後進行分析,而實際上我可能只對某些檔案感興趣。所以相對而言這個方案本身來說就是相對低效的。

  • 這種大的離線任務一旦遇到磁碟壞盤或者機器宕機,就需要重試,重試一次需要耗費很長的時間比如幾十分鐘。如果說重試一兩次的話這個延遲就會比較大了。

痛點二:

針對一些細瑣的一些問題而言的。這裡簡單列舉了三個場景來分析:

  • 不可靠的更新操作。我們經常在ETL過程中執行一些insert overwrite之類的操作,這類操作會先把相應分割槽的資料刪除,再把生成的檔案載入到分割槽中去。在我們移除檔案的時候,很多正在讀取這些檔案的任務就會發生異常,這就是不可靠的更新操作。

  • 表Schema變更低效。目前我們在對錶做一些加欄位、更改分割槽的操作其實是非常低效的操作,我們需要把所有的原始資料讀出來,然後在重新寫回去。這樣就會非常耗時,並且低效。

  • 資料可靠性缺乏保障。主要是我們對於分割槽的操作,我們會把分割槽的資訊分為兩個地方,HDFS和Metastore,分別儲存一份。在這種情況下,如果進行更新操作,就可能會出現一個更新成功而另一個更新失敗,會導致資料不可靠。

痛點三:

基於Lambda架構建設的實時數倉存在較多的問題。如上圖的這個架構圖,第一條鏈路是基於kafka中轉的一條實時鏈路(延遲要求小於5分鐘),另一條是離線鏈路(延遲大於1小時),甚至有些公司會有第三條準實時鏈路(延遲要求5分鐘~一小時),甚至更復雜的場景。

  • 兩條鏈路對應兩份資料,很多時候實時鏈路的處理結果和離線鏈路的處理結果對不上。

  • Kafka無法儲存海量資料, 無法基於當前的OLAP分析引擎高效查詢Kafka中的資料。

  • Lambda維護成本高。程式碼、資料血緣、Schema等都需要兩套。運維、監控等成本都非常高。

痛點四:

不能友好地支援高效更新場景。大資料的更新場景一般有兩種,一種是CDC ( Change Data Capture ) 的更新,尤其在電商的場景下,將binlog中的更新刪除同步到HDFS上。另一種是延遲資料帶來的聚合後結果的更新。目前HDFS只支援追加寫,不支援更新。因此業界很多公司引入了Kudu。但是Kudu本身是有一些侷限的,比如計算儲存沒有做到分離。這樣整個數倉系統中引入了HDFS、Kafka以及Kudu,運維成本不可謂不大。

上面就是針對目前數倉所涉及到的四個痛點的大致介紹,因此我們也是通過對資料湖的調研和實踐,希望能在這四個方面對數倉建設有所幫助。接下來重點講解下對資料湖的一些思考。

02

資料湖Iceberg核心原理

1. 資料湖開源產品調研

資料湖大致是從19年開始慢慢火起來的,目前市面上核心的資料湖開源產品大致有這麼幾個:

  • DELTA LAKE,在17年的時候DataBricks就做了DELTA LAKE的商業版。主要想解決的也是基於Lambda架構帶來的儲存問題,它的初衷是希望通過一種儲存來把Lambda架構做成kappa架構。

  • Hudi ( Uber開源 ) 可以支援快速的更新以及增量的拉取操作。這是它最大的賣點之一。

  • Iceberg的初衷是想做標準的Table Format以及高效的ETL。

上圖是來自阿里Flink團體針對資料湖方案的一些調研對比,總體來看這些方案的基礎功能相對都還是比較完善的。我說的基礎功能主要包括:

  • 高效Table Schema的變更,比如針對增減分割槽,增減欄位等功能

  • ACID語義保證

  • 同時支援流批讀寫,不會出現髒讀等現象

  • 支援OSS這類廉價儲存

2. 當然還有一些不同點:

  • Hudi的特性主要是支援快速的更新刪除和增量拉取。

  • Iceberg的特性主要是程式碼抽象程度高,不繫結任何的Engine。它暴露出來了非常核心的表層面的介面,可以非常方便的與Spark/Flink對接。然而Delta和Hudi基本上和spark的耦合很重。如果想接入flink,相對比較難。

3. 我們選擇Iceberg的原因:

  • 現在國內的實時數倉建設圍繞flink的情況會多一點。所以能夠基於flink擴充套件生態,是我們選擇iceberg一個比較重要的點。

  • 國內也有很多基於Iceberg開發的重要力量,比如騰訊團隊、阿里Flink官方團隊,他們的資料湖選型也是Iceberg。目前他們在社群分別主導update以及flink的生態對接。

4. 接下來我們重點介紹一下Iceberg:

這是來自官方對於Iceberg的一段介紹,大致就是Iceberg是一個開源的基於表格式的資料湖。關於table format再給大家詳細介紹下:

左側圖是一個抽象的資料處理系統,分別由SQL引擎、table format、檔案集合以及分散式檔案系統構成。右側是對應的現實中的元件,SQL引擎比如HiveServer、Impala、Spark等等,table format比如Metastore或者Iceberg,檔案集合主要有Parquet檔案等,而分散式檔案系統就是HDFS。

對於table format,我認為主要包含4個層面的含義,分別是表schema定義(是否支援複雜資料型別),表中檔案的組織形式,表相關統計資訊、表索引資訊以及表的讀寫API實現。詳述如下:

  • 表schema定義了一個表支援欄位型別,比如int、string、long以及複雜資料型別等。

  • 表中檔案組織形式最典型的是Partition模式,是Range Partition還是Hash Partition。

  • Metadata資料統計資訊。

  • 封裝了表的讀寫API。上層引擎通過對應的API讀取或者寫入表中的資料。

和Iceberg差不多相當的一個元件是Metastore。不過Metastore是一個服務,而Iceberg就是一個jar包。這裡就Metastore 和 Iceberg在表格式的4個方面分別進行一下對比介紹:

① 在schema層面上沒有任何區別:

都支援int、string、bigint等型別。

② partition實現完全不同:

兩者在partition上有很大的不同:

metastore中partition欄位不能是表字段,因為partition欄位本質上是一個目錄結構,不是使用者表中的一列資料。基於metastore,使用者想定位到一個partition下的所有資料,首先需要在metastore中定位出該partition對應的所在目錄位置資訊,然後再到HDFS上執行list命令獲取到這個分割槽下的所有檔案,對這些檔案進行掃描得到這個partition下的所有資料。

iceberg中partition欄位就是表中的一個欄位。Iceberg中每一張表都有一個對應的檔案元資料表,檔案元資料表中每條記錄表示一個檔案的相關資訊,這些資訊中有一個欄位是partition欄位,表示這個檔案所在的partition。

很明顯,iceberg表根據partition定位檔案相比metastore少了一個步驟,就是根據目錄資訊去HDFS上執行list命令獲取分割槽下的檔案。

試想,對於一個二級分割槽的大表來說,一級分割槽是小時時間分割槽,二級分割槽是一個列舉欄位分割槽,假如每個一級分割槽下有30個二級分割槽,那麼這個表每天就會有24 * 30 = 720個分割槽。基於Metastore的partition方案,如果一個SQL想基於這個表掃描昨天一天的資料的話,就需要向Namenode下發720次list請求,如果掃描一週資料或者一個月資料,請求數就更是相當誇張。這樣,一方面會導致Namenode壓力很大,一方面也會導致SQL請求響應延遲很大。而基於Iceberg的partition方案,就完全沒有這個問題。

③ 表統計資訊實現粒度不同:

Metastore中一張表的統計資訊是表/分割槽級別粒度的統計資訊,比如記錄一張表中某一列的記錄數量、平均長度、為null的記錄數量、最大值\最小值等。

Iceberg中統計資訊精確到檔案粒度,即每個資料檔案都會記錄所有列的記錄數量、平均長度、最大值\最小值等。

很明顯,檔案粒度的統計資訊對於查詢中謂詞(即where條件)的過濾會更有效果。

讀寫API實現不同:

metastore模式下上層引擎寫好一批檔案,呼叫metastore的add partition介面將這些檔案新增到某個分割槽下。

Iceberg模式下上層業務寫好一批檔案,呼叫iceberg的commit介面提交本次寫入形成一個新的snapshot快照。這種提交方式保證了表的ACID語義。同時基於snapshot快照提交可以實現增量拉取實現。

總結下Iceberg相對於Metastore的優勢:

  • 新partition模式:避免了查詢時n次呼叫namenode的list方法,降低namenode壓力,提升查詢效能

  • 新metadata模式:檔案級別列統計資訊可以用來根據where欄位進行檔案過濾,很多場景下可以大大減少掃描檔案數,提升查詢效能

  • 新API模式:儲存批流一體

    1. 流式寫入-增量拉取(基於Iceberg統一儲存模式可以同時滿足業務批量讀取以及增量訂閱需求)

    2. 支援批流同時讀寫同一張表,統一表schema,任務執行過程中不會出現FileNotFoundException

Iceberg的提升體現在:

03

資料湖Iceberg社群現狀

目前Iceberg主要支援的計算引擎包括Spark2.4.5、Spark 3.x以及Presto。同時,一些運維工作比如snapshot過期、小檔案合併、增量訂閱消費等功能都可以實現。

在此基礎上,目前社群正在開發的功能主要有Hive整合、Flink整合以及支援Update/Delete功能。相信下一個版本就可以看到Hive/Flink整合的相關功能。

04

網易資料湖Iceberg實踐之路

Iceberg針對目前的大數量的情況下,可以大大提升ETL任務執行的效率,這主要得益於新Partition模式下不再需要請求NameNode分割槽資訊,同時得益於檔案級別統計資訊模式下可以過濾很多不滿足條件的資料檔案。

當前iceberg社群僅支援Spark2.4.5,我們在這個基礎上做了更多計算引擎的適配工作。主要包括如下:

  • 整合Hive。可以通過Hive建立和刪除iceberg表,通過HiveSQL查詢Iceberg表中的資料。

  • 整合Impala。使用者可以通過Impala新建iceberg內表\外表,並通過Impala查詢Iceberg表中的資料。目前該功能已經貢獻給Impala社群。

  • 整合Flink。已經實現了Flink到Iceberg的sink實現,業務可以消費kafka中的資料將結果寫入到Iceberg中。同時我們基於Flink引擎實現了小檔案非同步合併的功能,這樣可以實現Flink一邊寫資料檔案,一邊執行小檔案的合併。基於Iceberg的小檔案合併通過commit的方式提交,不需要刪除合併前的小檔案,也就不會引起讀取任務的任何異常。