1. 程式人生 > 其它 >Databricks 企業版 Spark&Delta Lake 引擎助力 Lakehouse 高效訪問

Databricks 企業版 Spark&Delta Lake 引擎助力 Lakehouse 高效訪問

簡介:本文介紹了Databricks企業版Delta Lake的效能優勢,藉助這些特效能夠大幅提升Spark SQL的查詢效能,加快Delta表的查詢速度。

作者:

李錦桂(錦犀) 阿里雲開源大資料平臺開發工程師

王曉龍(筱龍) 阿里雲開源大資料平臺技術專家

背景介紹

Databricks是全球領先的Data+AI企業,是Apache Spark的創始公司,也是Spark的最大程式碼貢獻者,核心圍繞Spark、Delta Lake、MLFlow等開源生態打造企業級Lakehouse產品。2020年,Databricks 和阿里雲聯手打造了基於Apache Spark的雲上全託管大資料分析&AI平臺——Databricks資料洞察(DDI,Databricks DataInsight),為使用者提供資料分析、資料工程、資料科學和人工智慧等方面的服務,構建一體化的Lakehouse架構。

Delta Lake是Databricks從2016年開始在內部研發的一款支援事務的資料湖產品,於2019年正式開源。除了社群主導的開源版Delta Lake OSS,Databricks商業產品裡也提供了企業版Spark&Detla Lake引擎,本文將介紹企業版提供的產品特性如何優化效能,助力高效訪問Lakehouse。

針對小檔案問題的優化解法

在Delta Lake中頻繁執行merge, update, insert操作,或者在流處理場景下不斷往Delta表中插入資料,會導致Delta表中產生大量的小檔案。小檔案數量的增加一方面會使得Spark每次序列讀取的資料量變少,降低讀取效率,另一方面,使得Delta表的元資料增加,元資料獲取變慢,從另一個維度降低表的讀取效率。

為了解決小檔案問題,Databricks提供了三個優化特性,從避免小檔案的產生和自動/手動合併小檔案兩個維度來解決Delta Lake的小檔案問題。

特性1:優化Delta表的寫入,避免小檔案產生

在開源版Spark中,每個executor向partition中寫入資料時,都會建立一個表文件進行寫入,最終會導致一個partition中產生很多的小檔案。Databricks對Delta表的寫入過程進行了優化,對每個partition,使用一個專門的executor合併其他executor對該partition的寫入,從而避免了小檔案的產生。

該特性由表屬性delta.autoOptimize.optimizeWrite
來控制:
  • 可以在建立表時指定
CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);
  • 也可以修改表屬性
ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

該特性有兩個優點:

  1. 通過減少被寫入的表文件數量,提高寫資料的吞吐量;
  2. 避免小檔案的產生,提升查詢效能。

其缺點也是顯而易見的,由於使用了一個executor來合併表文件的寫入,從而降低了表文件寫入的並行度,此外,多引入的一層executor需要對寫入的資料進行shuffle,帶來額外的開銷。因此,在使用該特性時,需要對場景進行評估:

  • 該特性適用的場景:頻繁使用MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT等SQL語句的場景;
  • 該特性不適用的場景:寫入TB級以上資料。

特性2:自動合併小檔案

在流處理場景中,比如流式資料入湖場景下,需要持續的將到達的資料插入到Delta表中,每次插入都會建立一個新的表文件用於儲存新到達的資料,假設每10s觸發一次,那麼這樣的流處理作業一天產生的表文件數量將達到8640個,且由於流處理作業通常是long-running的,執行該流處理作業100天將產生上百萬個表文件。這樣的Delta表,僅元資料的維護就是一個很大的挑戰,查詢效能更是急劇惡化。

為了解決上述問題,Databricks提供了小檔案自動合併功能,在每次向Delta表中寫入資料之後,會檢查Delta表中的表文件數量,如果Delta表中的小檔案(size < 128MB的視為小檔案)數量達到閾值,則會執行一次小檔案合併,將Delta表中的小檔案合併為一個新的大檔案。

該特性由表屬性delta.autoOptimize.autoCompact控制,和特性delta.autoOptimize.optimizeWrite相同,可以在建立表時指定,也可以對已建立的表進行修改。自動合併的閾值由spark.databricks.delta.autoCompact.minNumFiles控制,預設為50,即小檔案數量達到50會執行表文件合併;合併後產生的檔案最大為128MB,如果需要調整合並後的目標檔案大小,可以通過調整配置spark.databricks.delta.autoCompact.maxFileSize實現。

特性3:手動合併小檔案

自動小檔案合併會在對Delta表進行寫入,且寫入後表中小檔案達到閾值時被觸發。除了自動合併之外,Databricks還提供了Optimize命令使使用者可以手動合併小檔案,優化表結構,使得表文件的結構更加緊湊。在實現上Optimize使用bin-packing演算法,該演算法不但會合並表中的小檔案,且合併後生成的表文件也更均衡(表文件大小相近)。例如,我們要對Delta表student的表文件進行優化,僅需執行如下命令即可實現:

OPTIMIZE student;

Optimize命令不但支援全表小檔案的合併,還支援特定的分割槽的表文件的合併,例如,我們可以僅對date大於2017-01-01的分割槽中的小檔案進行合併:

OPTIMIZE student WHERE date >= '2017-01-01'

從Databricks資料洞察產品上的試驗資料看,Optimize能使查詢效能達到8x以上的提升。

媲美企業級資料庫的查詢優化技術

Databricks在資料查詢方面也做了諸多優化,包括:

特性1:Data Skipping

在資料查詢系統中,有兩種經典的查詢優化 技術:一種是以更快的速度處理資料,另一種是通過跳過不相關的資料,減少需要掃描的資料量。Data Skipping屬於後一種優化技術,通過表文件的統計資訊跳過不相關的表文件,從而提升查詢效能。

在向Delta表中新增表文件時,Delta Lake會在Delta表的元資料中儲存該表文件中的資料前32列的統計資訊,包括資料列的最大最小值,以及為null的行的數量,在查詢時,Databricks會利用這些統計資訊提升查詢效能。例如:對於一張Delta表的x列,假設該表的一個表文件x列的最小值為5,最大值為10,如果查詢條件為 where x < 3,則根據表文件的統計資訊,我們可以得出結論:該表文件中一定不包含我們需要的資料,因此我們可以直接跳過該表文件,減少掃描的資料量,進而提升查詢效能。

Data Skipping的實現原理和布隆過濾器類似,通過查詢條件判斷表文件中是否可能存在需要查詢的資料,從而減少需要掃描的資料量。如果表文件不可能存在查詢的資料,則可以直接跳過,如果表文件可能存在被查詢的資料,則需要掃描表文件。

為了能儘可能多的跳過和查詢無關的表文件,我們需要縮小表文件的min-max的差距,使得相近的資料儘可能在檔案中聚集。舉一個簡單的例子,假設一張表包含10個表文件,對於表中的x列,它的取值為[1, 10],如果每個表文件的x列的分佈均為[1, 10],則對於查詢條件:where x < 3,無法跳過任何一個表文件,因此,也無法實現效能提升,而如果每個表文件的min-max均為0,即在表文件1的x列分佈為[1, 1],表文件2的x列分佈為[2, 2]...,則對於查詢條件:where x < 3,可以跳過80%的表文件。受該思想的啟發,Databricks支援使用Z-Ordering來對資料進行聚集,縮小表文件的min-max差距,提升查詢效能。下面我們介紹Z-Ordering優化的原理和使用。

特性2:Z-Ordering優化

如上一節所解釋的,為了能儘可能多的跳過無關的表文件,表文件中作為查詢條件的列應該儘可能緊湊(即min-max的差距儘可能小)。Z-Ordering就可以實現該功能,它可以在多個維度上將關聯的資訊儲存到同一組檔案中,因此確切來說,Z-Ordering實際是一種資料佈局優化演算法,但結合Data Skipping,它可以顯著提升查詢效能。

Z-Ordering的使用非常簡單,對於表events,如果經常使用列eventTypegenerateTime作為查詢條件,那麼執行命令:

OPTIMIZE events ZORDER BY (eventType, generateTime)

Delta表會使用列eventTypegenerateTime調整資料佈局,使得表文件中eventTypegenerateTime儘可能緊湊。

根據我們在Databricks DataInsight上的試驗,使用Z-Ordering優化能達到40倍的效能提升,具體的試驗案例參考文末Databricks資料洞察的官方文件。

特性3:布隆過濾器索引

布隆過濾器也是一項非常有用的Data-skipping技術。該技術可以快速判斷表文件中是否包含要查詢的資料,如果不包含就及時跳過該檔案,從而減少掃描的資料量,提升查詢效能。

如果在表的某列上建立了布隆過濾器索引,並且使用where col = "something"作為查詢條件,那麼在掃描表中檔案時,我們可以使用布隆過濾器索引得出兩種結論:檔案中肯定不包含col = "something"的行,或者檔案有可能包含col = "something"的行。

  • 當得出文件中肯定不包含col = "something"的行的結論時,就可以跳過該檔案,從而減少掃描的資料量,提升查詢效能。
  • 當得出文件中可能包含col = "something"的行的結論時,引擎才會去處理該檔案。注意,這裡僅僅是判斷該檔案中可能包含目標資料。布隆過濾器定義了一個指標,用於描述出現判斷失誤的概率,即判斷檔案中包含需要查詢的資料,而實際上該檔案並不包含目標資料的概率,並稱之為FPP(False Positive Probability: 假陽性概率)。

Databricks支援檔案級Bloom過濾器,如果在表的某些列建立了布隆過濾器索引,那麼該表的每個表文件都會關聯一個 Bloom 篩選器索引檔案,索引檔案儲存在表文件同級目錄下的 _delta_index 子目錄中。在讀取表文檔案之前,Databricks會檢查索引檔案,根據上面的步驟判斷表文件中是否包含需要查詢的資料,如果不包含則直接跳過,否則再進行處理。

布隆過濾器索引的建立和傳統資料庫索引的建立類似,但需要指定假陽性概率和該列可能出現的值的數量:

CREATE BLOOMFILTER INDEX ON TABLE table_name
FOR COLUMNS(col_name OPTIONS (fpp=0.1, numItems=50000000))

根據我們在Databricks DataInsight上的試驗,使用布隆過濾器索引能達到3倍以上的效能提升,試驗案例參考文末Databricks資料洞察的官方文件。

特性4:動態檔案剪枝

動態檔案剪枝(Dynamic File Pruning, DFP)和動態分割槽剪枝(Dynamic Partition Pruning)相似,都是在維表和事實表的Join執行階段進行剪枝,減少掃描的資料量,提升查詢效率。
下面我們以一個簡單的查詢為例來介紹DFP的原理:

SELECT sum(ss_quantity) FROM store_sales 
JOIN item ON ss_item_sk = i_item_sk
WHERE i_item_id = 'AAAAAAAAICAAAAAA'

在該查詢中,item為維表(資料量很少),store_sales為事實表(資料量非常大),where查詢條件作用於維表上。如果不開啟DFP,那麼該查詢的邏輯執行計劃如下:

從上圖可以看出,先對store_sales進行全表掃描,然後再和過濾後的item表的行進行join,雖然結果僅有4萬多條資料,但卻掃描了表store_sales中的80多億條資料。針對該查詢,很直觀的優化是:先查詢出表item中i_item_id = 'AAAAAAAAICAAAAAA'資料行,然後將這些資料行的i_item_sk值作為表store_sales的ss_item_sk的查詢條件在表store_sales的SCAN階段進行過濾,結合我們在上面介紹的Data Skipping技術,可以大幅減少表文件的掃描。這一思路正是DFP的根本原理,啟動DFP後的邏輯執行計劃如下圖所示:

可以看到,在啟用DFP之後,過濾條件被下推到SCAN操作中,僅掃描了600多萬條store_sales中的資料,從結果上看,啟動DFP後,該條查詢實現了10倍的效能提升,此外,Databricks還針對該特性對TPC-DS測試,測試發現啟用DFP後,TPC-DS的第15條查詢達到了8倍的效能提升,有36條查詢實現了2倍及以上的效能提升。

總結

前文概括介紹了Databricks企業版Delta Lake的效能優勢,藉助這些特效能夠大幅提升Spark SQL的查詢效能,加快Delta表的查詢速度。

原文連結

本文為阿里雲原創內容,未經允許不得轉載。