1. 程式人生 > 實用技巧 >Apache Hudi重磅特性解讀之全域性索引

Apache Hudi重磅特性解讀之全域性索引

1. 摘要

Hudi表允許多種型別操作,包括非常常用的upsert,當然為支援upsert,Hudi依賴索引機制來定位記錄在哪些檔案中。

當前,Hudi支援分割槽和非分割槽的資料集。分割槽資料集是將一組檔案(資料)放在稱為分割槽的桶中的資料集。一個Hudi資料集可能由N個分割槽和M個檔案組成,這種組織結構也非常方便hive/presto/spark等引擎根據分割槽欄位過濾以返回有限的資料量。而分割槽的值絕大多數情況下是從資料中得來,這個要求一旦一條記錄對映到分割槽/桶,那麼這個對映應該 a) 被Hudi知道;b) 在Hudi資料集生命週期裡保持不變。

在一個非分割槽資料上Hudi需要知道recordKey -> fileId的對映以便對記錄進行upsert

操作,現有解決方案如下:a) 使用者/客戶端通過payload提供正確的分割槽值;b) 實現GlobalBloomIndex索引來掃描指定路徑下的所有檔案。上述兩個場景下,要麼需要使用者提供對映資訊,要麼會導致掃描所有檔案的效能開銷。

這個方案擬實現一種新的索引型別,維護(recordKey <-> partition, fileId) 對映或者((recordKey, partitionPath) → fileId)對映,這種對映由Hudi儲存和維護,可以解決上述提到的兩個限制。

2. 背景

資料集型別

Hudi儲存抽象主要有兩部分組成:1) 實際儲存的資料;2) 用於定位記錄位置(fileId)的索引,如果沒有這個資訊,Hudi不能處理upserts

。我們可以將資料湖中攝取的所有資料集大致分為兩類。

  • 插入/事件資料

    插入或事件資料表示新寫入表的資料和之前寫入的資料沒有任何交集,更具體點就是表中每一行資料都是新的一行並且和之前寫入的資料沒有重疊。比如從App中攝取日誌到表中,每一行日誌都是新的一行,和之前寫入的日誌沒有關係,因此新的寫入不需要任何之前寫入的上下文來決定新資料應該寫入到哪裡。

  • 更新/變更日誌資料

    更新/變更日誌處理是另外一個挑戰,寫入表的資料可能依賴之前寫入的資料。更具體點就是表中每一行資料不是新行並且可能和之前寫入的行會重疊,在這種場景下,系統需要決定哪一行需要被更新,因此需要找到需要更新哪個fileId。

Hudi提供了3種供使用者使用的方案

  • 資料組織結構為分割槽結構,每個分割槽包含N個檔案,客戶端維護recordKey<->fileId的對映用於表的更新,在將記錄傳遞至Hudi處理之前需要提供分割槽資訊。HoodieBloomIndex實現會掃描分割槽下所有檔案中的BloomIndex,如果匹配,則繼續在檔案中確認,這個過程稱為tag,即將記錄定位到具體的fileId。
  • 資料組織結構為扁平結構,即單個目錄包含了表中所有檔案。GlobalHoodieBloomIndex實現會掃描所有檔案中的BloomIndex,如果匹配,則繼續在檔案中確認,這個過程同上,但與第一個不同點在於如果檔案資料非常大,那麼進行tag的時間會非常耗時。
  • 針對append-only的資料集,即不需要更新,只需要使用payload中的分割槽,如當前的timestamp。

無論是何種型別資料集(append或者append + update),tag過程對寫和讀的效能影響都非常大。如果我們能夠提供記錄(record)級別的索引(recordKey -> FileId, partition)而不增加太多延遲的話,這將會讓Hudi效能更快。

因此這個RFC旨在提供記錄(record)級別的索引來加快Hudi的查詢過程。

注意:為方便解釋說明,下面我們考慮非分割槽資料集,因此對映中的鍵為recordKey,值為(PartitionPath, FileId)。

3. 實現方案

3.1 基於Hash的索引

索引條目被hash至不同的bucket(桶)中,每個桶中存放recordKey -> (PartitionPath, FileId)的對映,桶總數量需提前定義好,並且不能更新,但每個桶可載入不止一個FileGroup,後面會詳細介紹FileGroup。1000個桶,每個桶100W個條目,那麼可索引10億個條目,所以只有當獨立條目大於10億個時,才需要在一個桶中放多個FileGroup。

每個桶對外暴露兩個API,getRecordLocation(JavaRDD<RecordKeys>)insertRecords(JavaPairRDD<RecordKey, Tuple2<PatitionPath, FileId>>)

3.2 儲存

使用HFile(link1, link2) 進行儲存,因為HFile有非常好的隨機讀取效能,這裡有關於HFile的基準測試,簡要概括如下,如果HFile包含100W個條目,查詢10W個目標在95%情況下只需要~600ms,如果在實際中可以達到這個效能,那麼將會進一步提升Hudi效能。

3.3 索引寫路徑

對於寫路徑,一旦確定所有寫入記錄的HoodieRecordLocation,那麼這些記錄就被對映為(RecordKey, <PartitionPath, FileId>)。基於RecordKey進行hash,並對映到桶。桶和RecordKey的對映一旦確定後就不會變化。每個Bucket包含N個HFile,另外,所有寫入單個HFile的記錄需要進行排序,每批新寫入會在對應桶中建立新的HFile,因此每個桶會包含N個HFile。同時為限制HFile的數目,也會對HFile做Compaction。因為Hudi中Record對應的FileId永遠不變,因此索引的值也不會再變化,這個特性也會在讀路徑起到作用。

並行度:寫入時並行度最好等於分割槽總數,每個批次在一個桶中最多建立一個HFile。

需要注意的是資料寫入和索引寫入過程是繫結的,需要在一個ACID內完成,即要麼一起提交,要麼一起回滾。

3.3.1 更新

現在Hudi中記錄的位置資訊是不可變的,但是不能確保之後一直是不可變的,因此索引應該能處理對映的更新,在這種情況下,多個值將會被返回(例如,如果HFile1為Record1返回FileId1,HFile3為Record1返回FileId5,我們會選取HFile5的值,因此Record1的位置就是FileId5)。對於提交時間戳,我們要麼依賴檔名要麼依賴提交元資料,而不是值裡包含的時間,因為這樣會讓索引的大小爆炸。

3.4 索引讀路徑

對於讀和更新路徑,在讀或寫之前需要知道每條記錄的位置,所以getRecordLocations(JavaRDD<RecordKeys>)方法將會被呼叫,這些記錄將會被hash到對應的桶,對應的桶將會在HFile中查詢記錄。

並行度:如前所述,因為暫時不存在對索引的更新,單條記錄在一個Bucket中只能存在於一個HFile,所以所有的HFile可並行查詢,例如如果我們有100個桶,每個桶有10個HFile,那麼可以設定並行度為1000。

3.5 索引刪除

可以使用特殊值,如新增一個對應null值的條目,所以在索引查詢時,可以繼續使用相同的併發度,但是如果返回多個值時選擇最新的值,例如HFile1為Record1返回FileId1,HFile3為Record1返回null,那麼會選取HFile3的值並且知道Record1已經被刪除了。對於提交時間戳,我們要麼依賴檔名要麼依賴提交元資料,而不是值裡包含的時間,因為這樣會讓索引的大小爆炸。

支援刪除會讓Compaction變得相對複雜,由於刪除操作存在,在Compaction寫入新檔案時 ,可能需要讀取所有待進行Compaction的HFile的所有內容,以便找到最新的值,這可能不會帶來太多的開銷。另外,Compaction也會忽略被刪除的條目以便節省空間。所以可能無法判定一條記錄是否從來都未被插入,或者在插入後被刪除。

注意:對於刪除的條目,還需要支援重新插入。上面介紹的工作流即可支援而無需任何修改。

3.6 Hashing

作為預設實現,我們可以使用Java原生的Hash演演算法對RecordKey進行Hash,但是可支援開發者自定義Hash演演算法。

3.7 HFile scan vs seek

通過benchmark可知,對於包含100W個條目的HFile,隨機seek在30W ~ 40W的查詢時表現較好,否則全檔案scan(讀取整個HFile到記憶體進行查詢)表現更好。所以在查詢時可以利用這個實驗結果。我們可以儲存每個HFile的所有條目,在查詢時,如果查詢 < 30%條目,可以使用隨機seek,否則進行全表掃描。我們可以引入兩個配置,record.level.index.dynamically.choose.scan.vs.seekrecord.level.index.do.random.lookups,如果第一個配置設定為true,那麼會動態選擇scan和seek,如果設定為false,對於流式應用,會考慮第二個配置。

3.8 未來擴充套件

通常,一個好的做法是留出30%的Buffer,以避免超出初始儲存桶數。因為在嘗試擴充套件到超出初始化的儲存桶的初始數量時,會有一些權衡或開銷。第一個實現版本可能不會考慮這個問題,希望由使用者自行處理。

3.8.1 選項1-桶中新增多個FileGroup

在一個Bucket中建立多個FileGroup,一個FileGroup代表多個HFile,多個HFile構成一個Group,這些HFile可以被壓縮成一個基礎HFile,所以一個FileGroup擁有一個基礎HFile檔案。

若預先分配1000個桶,每個桶100W個條目。對於壓縮而言,一個FileGroup中的所有HFile將會被壓縮成一個HFile,所以如果不擴充套件到其他FileGroup,那麼同一時間一個HFile檔案中可能包含200W個條目,這會導致效能下降,所以當達到100W大小時,應該新建一個FileGroup,這意味著一個桶的權重等於兩個虛擬桶,所以hash和桶個數保持相同,但是索引能夠擴充套件多個條目。但新的FileGroup被建立時,老的FileGroup將會被密封(sealed),即不再寫入新的條目。新的寫入將寫入新的FileGroup,讀取也不會變化,可以併發查詢所有HFile檔案。

3.8.2 選項2-多個hash查詢和桶組

第一個hash可索引到1 ~ 1000的桶(稱為一個桶組),一旦達到桶組的80%時,需要選取一個新的hash,新的hash可索引到1001 ~ 2000,所以在索引查詢時,所有記錄會進行兩次查詢,如果查詢存在,那麼每個桶組只會返回一個值,新的寫入將進入桶1001 ~ 2000。

4. 實現說明

如上面章節所述,我們需要對給定桶中的所有HFile進行Compaction(壓縮)。為了複用現在程式碼中的Compaction邏輯,我們引入了 Inline FileSystem ,可以在給定檔案中Inline(內聯)任何型別(Parquet、HFIle等),有了Inline FileSystem,我們可以在任何通用檔案中內聯HFile。會為每個內聯的HFile生成一個URL路徑,這個URL路徑可被HFile Reader作為單獨的HFile讀取裡面的內容,下面展示檔案中內聯HFile的結構。

對於雲上物件儲存,如OSS、S3(不支援append),那麼一個資料檔案中只會內聯一個HFile。

考慮索引方案中的每個桶都是Hudi分割槽中的一個檔案組(包含實際資料)。MOR資料集中的典型分割槽可能有一個基礎檔案和N個小增量檔案,假設在這個索引中每個桶都有一個相似的結構。每個桶應該有一個基本檔案和N個較小的delta檔案,每個檔案都有一個內聯HFile。每一批新的攝取要麼將新的HFile作為新的資料塊附加到現有的delta檔案中,要麼建立一個新的增量檔案並將新的HFile作為第一個資料塊寫入。每隔一段時間,壓縮將提取基礎HFile和所有delta HFile檔案,以建立一個新的基本檔案(內聯HFile)作為壓縮版本。

下面是一個例子,說明在壓縮前和壓縮後,索引在單個桶中的結構

在物件儲存上的結構如下

上述結構會帶來很多好處。由於非同步壓縮已經進行過非常多的測試,只需做一些小的變更就可以重用Compaction。在本例中,它不是資料檔案,而是內聯的HFile檔案。使用這種佈局,回滾和提交也很容易處理。上面結構得到與Hudi分割槽相同的檔案系統檢視(基礎HFile和增量HFile)。基於上面結構也很容易讀取所有在給定提交時間後的索引,在兩個時間間隔內提交的索引等。

5. 總結

記錄級別全域性索引將極大提升Hudi的寫入效能,有望在0.6.0版本釋出。