Databricks 企業版 Spark&Delta Lake 引擎助力 Lakehouse 高效訪問
簡介:本文介紹了Databricks企業版Delta Lake的效能優勢,藉助這些特效能夠大幅提升Spark SQL的查詢效能,加快Delta表的查詢速度。
作者:
李錦桂(錦犀) 阿里雲開源大資料平臺開發工程師
王曉龍(筱龍) 阿里雲開源大資料平臺技術專家
背景介紹
Databricks是全球領先的Data+AI企業,是Apache Spark的創始公司,也是Spark的最大程式碼貢獻者,核心圍繞Spark、Delta Lake、MLFlow等開源生態打造企業級Lakehouse產品。2020年,Databricks 和阿里雲聯手打造了基於Apache Spark的雲上全託管大資料分析&AI平臺——Databricks資料洞察(DDI,Databricks DataInsight),為使用者提供資料分析、資料工程、資料科學和人工智慧等方面的服務,構建一體化的Lakehouse架構。
針對小檔案問題的優化解法
在Delta Lake中頻繁執行merge, update, insert操作,或者在流處理場景下不斷往Delta表中插入資料,會導致Delta表中產生大量的小檔案。小檔案數量的增加一方面會使得Spark每次序列讀取的資料量變少,降低讀取效率,另一方面,使得Delta表的元資料增加,元資料獲取變慢,從另一個維度降低表的讀取效率。
為了解決小檔案問題,Databricks提供了三個優化特性,從避免小檔案的產生和自動/手動合併小檔案兩個維度來解決Delta Lake的小檔案問題。
特性1:優化Delta表的寫入,避免小檔案產生
在開源版Spark中,每個executor向partition中寫入資料時,都會建立一個表文件進行寫入,最終會導致一個partition中產生很多的小檔案。Databricks對Delta表的寫入過程進行了優化,對每個partition,使用一個專門的executor合併其他executor對該partition的寫入,從而避免了小檔案的產生。
- 可以在建立表時指定
CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);
- 也可以修改表屬性
ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);
該特性有兩個優點:
- 通過減少被寫入的表文件數量,提高寫資料的吞吐量;
- 避免小檔案的產生,提升查詢效能。
其缺點也是顯而易見的,由於使用了一個executor來合併表文件的寫入,從而降低了表文件寫入的並行度,此外,多引入的一層executor需要對寫入的資料進行shuffle,帶來額外的開銷。因此,在使用該特性時,需要對場景進行評估:
- 該特性適用的場景:頻繁使用MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT等SQL語句的場景;
- 該特性不適用的場景:寫入TB級以上資料。
特性2:自動合併小檔案
在流處理場景中,比如流式資料入湖場景下,需要持續的將到達的資料插入到Delta表中,每次插入都會建立一個新的表文件用於儲存新到達的資料,假設每10s觸發一次,那麼這樣的流處理作業一天產生的表文件數量將達到8640個,且由於流處理作業通常是long-running的,執行該流處理作業100天將產生上百萬個表文件。這樣的Delta表,僅元資料的維護就是一個很大的挑戰,查詢效能更是急劇惡化。
為了解決上述問題,Databricks提供了小檔案自動合併功能,在每次向Delta表中寫入資料之後,會檢查Delta表中的表文件數量,如果Delta表中的小檔案(size < 128MB的視為小檔案)數量達到閾值,則會執行一次小檔案合併,將Delta表中的小檔案合併為一個新的大檔案。
該特性由表屬性delta.autoOptimize.autoCompact
控制,和特性delta.autoOptimize.optimizeWrite
相同,可以在建立表時指定,也可以對已建立的表進行修改。自動合併的閾值由spark.databricks.delta.autoCompact.minNumFiles
控制,預設為50,即小檔案數量達到50會執行表文件合併;合併後產生的檔案最大為128MB,如果需要調整合並後的目標檔案大小,可以通過調整配置spark.databricks.delta.autoCompact.maxFileSize
實現。
特性3:手動合併小檔案
自動小檔案合併會在對Delta表進行寫入,且寫入後表中小檔案達到閾值時被觸發。除了自動合併之外,Databricks還提供了Optimize命令使使用者可以手動合併小檔案,優化表結構,使得表文件的結構更加緊湊。在實現上Optimize使用bin-packing演算法,該演算法不但會合並表中的小檔案,且合併後生成的表文件也更均衡(表文件大小相近)。例如,我們要對Delta表student的表文件進行優化,僅需執行如下命令即可實現:
OPTIMIZE student;
Optimize命令不但支援全表小檔案的合併,還支援特定的分割槽的表文件的合併,例如,我們可以僅對date大於2017-01-01的分割槽中的小檔案進行合併:
OPTIMIZE student WHERE date >= '2017-01-01'
從Databricks資料洞察產品上的試驗資料看,Optimize能使查詢效能達到8x以上的提升。
媲美企業級資料庫的查詢優化技術
Databricks在資料查詢方面也做了諸多優化,包括:
特性1:Data Skipping
在資料查詢系統中,有兩種經典的查詢優化 技術:一種是以更快的速度處理資料,另一種是通過跳過不相關的資料,減少需要掃描的資料量。Data Skipping屬於後一種優化技術,通過表文件的統計資訊跳過不相關的表文件,從而提升查詢效能。
在向Delta表中新增表文件時,Delta Lake會在Delta表的元資料中儲存該表文件中的資料前32列的統計資訊,包括資料列的最大最小值,以及為null的行的數量,在查詢時,Databricks會利用這些統計資訊提升查詢效能。例如:對於一張Delta表的x列,假設該表的一個表文件x列的最小值為5,最大值為10,如果查詢條件為 where x < 3,則根據表文件的統計資訊,我們可以得出結論:該表文件中一定不包含我們需要的資料,因此我們可以直接跳過該表文件,減少掃描的資料量,進而提升查詢效能。
Data Skipping的實現原理和布隆過濾器類似,通過查詢條件判斷表文件中是否可能存在需要查詢的資料,從而減少需要掃描的資料量。如果表文件不可能存在查詢的資料,則可以直接跳過,如果表文件可能存在被查詢的資料,則需要掃描表文件。
為了能儘可能多的跳過和查詢無關的表文件,我們需要縮小表文件的min-max的差距,使得相近的資料儘可能在檔案中聚集。舉一個簡單的例子,假設一張表包含10個表文件,對於表中的x列,它的取值為[1, 10],如果每個表文件的x列的分佈均為[1, 10],則對於查詢條件:where x < 3,無法跳過任何一個表文件,因此,也無法實現效能提升,而如果每個表文件的min-max均為0,即在表文件1的x列分佈為[1, 1],表文件2的x列分佈為[2, 2]...,則對於查詢條件:where x < 3,可以跳過80%的表文件。受該思想的啟發,Databricks支援使用Z-Ordering來對資料進行聚集,縮小表文件的min-max差距,提升查詢效能。下面我們介紹Z-Ordering優化的原理和使用。
特性2:Z-Ordering優化
如上一節所解釋的,為了能儘可能多的跳過無關的表文件,表文件中作為查詢條件的列應該儘可能緊湊(即min-max的差距儘可能小)。Z-Ordering就可以實現該功能,它可以在多個維度上將關聯的資訊儲存到同一組檔案中,因此確切來說,Z-Ordering實際是一種資料佈局優化演算法,但結合Data Skipping,它可以顯著提升查詢效能。
Z-Ordering的使用非常簡單,對於表events
,如果經常使用列eventType
和generateTime
作為查詢條件,那麼執行命令:
OPTIMIZE events ZORDER BY (eventType, generateTime)
Delta表會使用列eventType
和generateTime
調整資料佈局,使得表文件中eventType
和generateTime
儘可能緊湊。
根據我們在Databricks DataInsight上的試驗,使用Z-Ordering優化能達到40倍的效能提升,具體的試驗案例參考文末Databricks資料洞察的官方文件。
特性3:布隆過濾器索引
布隆過濾器也是一項非常有用的Data-skipping技術。該技術可以快速判斷表文件中是否包含要查詢的資料,如果不包含就及時跳過該檔案,從而減少掃描的資料量,提升查詢效能。
如果在表的某列上建立了布隆過濾器索引,並且使用where col = "something"
作為查詢條件,那麼在掃描表中檔案時,我們可以使用布隆過濾器索引得出兩種結論:檔案中肯定不包含col = "something"
的行,或者檔案有可能包含col = "something"
的行。
- 當得出文件中肯定不包含
col = "something"
的行的結論時,就可以跳過該檔案,從而減少掃描的資料量,提升查詢效能。 - 當得出文件中可能包含
col = "something"
的行的結論時,引擎才會去處理該檔案。注意,這裡僅僅是判斷該檔案中可能包含目標資料。布隆過濾器定義了一個指標,用於描述出現判斷失誤的概率,即判斷檔案中包含需要查詢的資料,而實際上該檔案並不包含目標資料的概率,並稱之為FPP(False Positive Probability: 假陽性概率)。
Databricks支援檔案級Bloom過濾器,如果在表的某些列建立了布隆過濾器索引,那麼該表的每個表文件都會關聯一個 Bloom 篩選器索引檔案,索引檔案儲存在表文件同級目錄下的 _delta_index 子目錄中。在讀取表文檔案之前,Databricks會檢查索引檔案,根據上面的步驟判斷表文件中是否包含需要查詢的資料,如果不包含則直接跳過,否則再進行處理。
布隆過濾器索引的建立和傳統資料庫索引的建立類似,但需要指定假陽性概率和該列可能出現的值的數量:
CREATE BLOOMFILTER INDEX ON TABLE table_name
FOR COLUMNS(col_name OPTIONS (fpp=0.1, numItems=50000000))
根據我們在Databricks DataInsight上的試驗,使用布隆過濾器索引能達到3倍以上的效能提升,試驗案例參考文末Databricks資料洞察的官方文件。
特性4:動態檔案剪枝
動態檔案剪枝(Dynamic File Pruning, DFP)和動態分割槽剪枝(Dynamic Partition Pruning)相似,都是在維表和事實表的Join執行階段進行剪枝,減少掃描的資料量,提升查詢效率。
下面我們以一個簡單的查詢為例來介紹DFP的原理:
SELECT sum(ss_quantity) FROM store_sales
JOIN item ON ss_item_sk = i_item_sk
WHERE i_item_id = 'AAAAAAAAICAAAAAA'
在該查詢中,item為維表(資料量很少),store_sales為事實表(資料量非常大),where查詢條件作用於維表上。如果不開啟DFP,那麼該查詢的邏輯執行計劃如下:
總結
前文概括介紹了Databricks企業版Delta Lake的效能優勢,藉助這些特效能夠大幅提升Spark SQL的查詢效能,加快Delta表的查詢速度。
本文為阿里雲原創內容,未經允許不得轉載。