【詳談 Delta Lake 】系列技術專題 之 特性(Features)
前言
本文翻譯自大資料技術公司 Databricks 針對資料湖 Delta Lake 系列技術文章。眾所周知,Databricks 主導著開源大資料社群 Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術,而 Delta Lake 作為資料湖核心儲存引擎方案給企業帶來諸多的優勢。
此外,阿里雲和 Apache Spark 及 Delta Lake 的原廠 Databricks 引擎團隊合作,推出了基於阿里雲的企業版全託管 Spark 產品——Databricks 資料洞察,該產品原生整合企業版 Delta Engine 引擎,無需額外配置,提供高效能運算能力。有興趣的同學可以搜尋` Databricks 資料洞察`或`阿里雲 Databricks `進入官網,或者直接訪問
譯者:張鵬(卓昇),阿里雲端計算平臺事業部技術專家
Delta Lake 技術系列 - 特性(Features)
——使用 Delta Lake 穩定的特性來可靠的管理您的資料
目錄
- Chapter-01 為什麼使用 Delta Lake 的 MERGE 功能?
- Chapter-02 使用 Python API 在 Delta Lake 資料表上進行簡單,可靠的更新和刪除操作
- Chapter-03 大型資料湖的 Time Travel 功能
- Chapter-04 輕鬆克隆您的 Delta Lake 以方便測試,資料共享以及進行重複的機器學習
- Chapter-05 在 Apache Spark 上的 Delta Lake 中啟用 Spark SQL 的 DDL 和 DML 語句
本文介紹內容
Delta Lake 系列電子書由 Databricks 出版,阿里雲端計算平臺事業部大資料生態企業團隊翻譯,旨在幫助領導者和實踐者瞭解 Delta Lake 的全部功能以及它所處的場景。在本文 Delta Lake 系列 - 特性( Features )中,重點介紹 Delta Lake 的特性。
後續
讀完本文後,您不僅可以瞭解 Delta Lake 提供了那些特性,還可以理解這些的特性是如何帶來實質性的效能改進的。
什麼是 Delta Lake?
Delta Lake 是一個統一的資料管理系統,為雲上資料湖帶來資料可靠性和快速分析。Delta Lake 執行在現有資料湖之上,並且與 Apache Spark 的 API 完全相容。
在 Databricks 中,我們看到了 Delta Lake 如何為資料湖帶來可靠性、高效能和生命週期管理。我們的客戶已經驗證,Delta Lake 解決了以下挑戰:從複雜的資料格式中提取資料、很難刪除符合要求的資料、以及為了進行資料捕獲從而修改資料所帶來的問題。
通過使用 Delta Lake,您可以加快高質量資料匯入資料湖的速度,團隊也可以在安全且可擴充套件雲服務上快速使用這些資料。
Chapter-01 為什麼使用 Delta Lake 的 MERGE 功能?
Delta Lake 是在 Apache Spark 之上構建的下一代引擎,支援 MERGE 命令,該命令使您可以有效地在資料湖中上傳和刪除記錄。
MERGE 命令大大簡化了許多通用資料管道的構建方式-所有重寫整個分割槽的低效且複雜的多跳步驟現在都可以由簡單的 MERGE 查詢代替。
這種更細粒度的更新功能簡化了如何為各種用例(從變更資料捕獲到 GDPR )構建大資料管道的方式。您不再需要編寫複雜的邏輯來覆蓋表同時克服快照隔離的不足。
隨著資料的變化,另一個重要的功能是在發生錯誤寫入時能夠進行回滾。 Delta Lake 還提供了帶有時間旅行特性的回滾功能,因此如果您合併不當,則可以輕鬆回滾到早期版本。
在本章中,我們將討論需要更新或刪除現有資料的常見用例。我們還將探討新增和更新固有的挑戰,並說明 MERGE 如何解決這些挑戰。
什麼時候需要 upserts?
在許多常見場景中,都需要更新或刪除資料湖中的現有資料:
- 遵守通用資料保護法規(GDPR):隨著 GDPR 中資料遺忘規則(也稱為資料擦除)的推出,組織必須根據要求刪除使用者的資訊。資料擦除還包括刪除資料湖中的使用者資訊。
- 更改傳統資料庫中獲得的資料:在面向服務的體系結構中,典型的 web 和移動應用程式採用微服務架構,這些微服務架構一般是基於具有低延遲效能的傳統 SQL/NoSQL 資料庫搭建的。組織面臨的最大挑戰之一是將許多孤立的資料系統建立連線,因此資料工程師建立了管道,可以將所有資料來源整合到中央資料湖中以加快分析。這些管道必須定期讀取傳統 SQL/NoSQL 表所做的更改,並將其應用於資料湖中的對應表中。此類更改可以支援多種形式:變化緩慢的表,所有插入/更新/刪除資料的資料變更等。
- 會話化:從產品分析,到目標廣告,再到預測性維護的許多領域,將多個事件分組為一個會話是常見的例子。建立連續的應用來跟蹤會話並記錄寫入資料湖的結果是非常困難的,因為資料湖經常因為追加的資料而進行優化。
- 重複資料刪除:常見的資料管道用例是通過追加資料的方式來將系統日誌收集到 Delta Lake 表中。但是資料來源通常會生成重複記錄,並且需要下游刪除重複資料來處理它們。
為什麼對資料湖的 upserts 在傳統上具有挑戰性
由於資料湖基本上是基於檔案的,它們經常針對新增資料而不是更改現有資料進行優化。因此構建上述用例一直是具有挑戰性的。
使用者通常會讀取整個表(或分割槽的子集),然後將其覆蓋。因此,每個組織都嘗試通過編寫複雜的查詢 SQL,Spark 等方式來重新造輪子,來滿足他們的需求。這種方法的特點是:
- 低效:為了更新很少的記錄而讀取和重寫整個分割槽(或整個表)會導致管道執行緩慢且成本高昂。手動調整表佈局以及優化查詢是很繁瑣的,而且需要深厚的領域知識。
- 有可能出錯:手寫程式碼來修改資料很容易出現邏輯和人為錯誤。例如,多個管道在沒有任何事務支援的情況下同時修改同一張表可能會導致不可預測的資料不一致,在最壞的情況下有可能會導致資料丟失。通常,即使是單一的手寫管道也可能由於業務邏輯中的錯誤,從而導致資料損壞。
- 難以維護:從根本上來說,這類手寫程式碼難以理解,跟蹤和維護。從長遠來看,僅此一項就會顯著增加組織和基礎設施成本。
介紹 Delta Lake 中 MERGE 命令
使用 Delta Lake,您可以使用以下 MERGE 命令輕鬆解決上述用例,並且不會遇到任何上述問題:
讓我們通過一個簡單的示例來了解如何使用 MERGE。 假設您有一個變化緩慢的使用者資料表,該表維護著諸如地址之類的使用者資訊。 此外您還有一個現有使用者和新使用者的新地址表。 要將所有新地址合併到主使用者表中,可以執行以下命令:
MERGE INTO users
USING updates
ON users.userId = updates.userId
WHEN MATCHED THEN
UPDATE SET address = updates.addresses
WHEN NOT MATCHED THEN
INSERT (userId, address) VALUES (updates.userId, updates.address)
這完全符合語法的要求-對於現有使用者(即 MATCHED 子句),它將更新 address 列,對於新使用者(即 NOT MATCHED 子句),它將插入所有列。 對於具有 TB 規模的大型資料表,Delta Lake MERGE 操作比覆蓋整個分割槽或表要快N個數量級,因為 Delta Lake 僅讀取相關檔案並更新它們。 具體來說,Delta Lake 的 MERGE 命令具有以下優勢:
- 細粒度:該操作以檔案而不是分割槽的粒度重寫資料,這樣解決了重寫分割槽,使用 MSCK 更新 Hive 元資料庫等所有複雜問題。
- 高效:Delta Lake 的資料 skip 功能使 MERGE 在查詢要重寫的檔案方面更高效,從而無需手動優化管道。 此外 Delta Lake 對所有 I/O 和處理過程進行了優化,使得 MERGE 進行所有資料的讀寫速度明顯快於 Apache Spark 中的類似操作。
- 事務性:Delta Lake 使用樂觀併發控制來確保併發寫入程式使用 ACID 事務來正確更新資料,同時併發讀取程式始終會看到一致的資料快照。
下圖是 MERGE 與手寫管道的直觀對比。
使用 MERGE 簡化用例
遵守 GDPR 而刪除資料
遵守 GDPR 的“被遺忘權”條款對資料湖中的資料進行任何處理都不容易。您可以使用示例程式碼來設定一個簡單的定時計劃作業,如下所示,刪除所有選擇退出服務的使用者。
MERGE INTO users
USING opted_out_users
ON opted_out_users.userId = users.userId
WHEN MATCHED THEN DELETE
資料庫中的資料變更應用
您可以使用 MERGE 語法輕鬆地將外部資料庫的所有資料更改(更新,刪除,插入)應用到 Delta Lake 表中,如下所示:
MERGE INTO users
USING (
SELECT userId, latest.address AS address, latest.deleted AS deleted FROM
(
SELECT userId, MAX(struct(TIME, address, deleted)) AS latest
FROM changes GROUP BY userId
)
) latestChange
ON latestChange.userId = users.userId
WHEN MATCHED AND latestChange.deleted = TRUE THEN
DELETE
WHEN MATCHED THEN
UPDATE SET address = latestChange.address
WHEN NOT MATCHED AND latestChange.deleted = FALSE THEN
INSERT (userId, address) VALUES (userId, address)
從 streaming 管道更新會話資訊
如果您有流事件的資料流入,並且想要對流事件資料進行會話化,同時增量更新會話並將其儲存在 Delta Lake 表中,則可以使用結構化資料流和 MERGE 中的 foreachBatch 來完成此操作。 例如,假設您有一個結構化流資料框架,該框架為每個使用者計算更新的 session 資訊。 您可以在所有會話應用中啟動流查詢,更新資料到 Delta Lake 表中,如下所示(Scala 語言)。
streamingSessionUpdatesDF.writeStream
.foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) =>
microBatchOutputDF.createOrReplaceTempView(“updates”) microBatchOutputDF.sparkSession.sql(s”””
MERGE INTO sessions
USING updates
ON sessions.sessionId = updates.sessionId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT * “””)
}.start()
Chapter-02 使用Python API在Delta Lake資料表上進行簡單,可靠的更新和刪除操作
在本章中,我們將演示在飛機時刻表的場景中,如何在 Delta Lake 中使用 Python 和新的 Python API。 我們將展示如何新增,更新和刪除資料,如何使用 time travle 功能來查詢舊版本資料,以及如何清理較舊的版本。
Delta Lake 使用入門
Delta Lake 軟體包可以通過 PySpark 的--packages 選項來進行安裝。在我們的示例中,我們還將演示在 VACUUM 檔案和 Apache Spark 中執行 Delta Lake SQL 命令的功能。 由於這是一個簡短的演示,因此我們還將啟用以下配置:
spark.databricks.delta.retentionDurationCheck.enabled=false
允許我們清理檔案的時間短於預設的保留時間7天。 注意,這僅是對於 SQL 命令 VACUUM 是必需的。
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
在 Apache Spark 中啟用 Delta Lake SQL 命令;這對於 Python 或 Scala API 呼叫不是必需的。
# Using Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf “spark. databricks.delta.retentionDurationCheck.enabled=false” --conf “spark. sql.extensions=io.delta.sql.DeltaSparkSessionExtension”
Delta Lake 資料的載入和儲存
這次將使用準時飛行資料或離港延誤資料,這些資料是從 RITA BTS 航班離崗統計中心生成的;這些資料的一些示例包括 2014 Flight Departure Performance via d3.js Crossfilter 和 針對Apache Spark的具有圖形化結構的準時飛行資料。 在 PySpark 中,首先讀取資料集。
# Location variables
tripdelaysFilePath = “/root/data/departuredelays.csv”
pathToEventsTable = “/root/deltalake/departureDelays.delta”
# Read flight delay data
departureDelays = spark.read \
.option(“header”, “true”) \
.option(“inferSchema”, “true”) \
.csv(tripdelaysFilePath)
接下來,我們將離港延遲資料儲存到 Delta Lake 表中。 在儲存的過程中,我們能夠利用它的優勢功能,包括 ACID 事務,統一批處理,streaming 和 time travel。
# Save flight delay data into Delta Lake format
departureDelays \
.write \
.format(“delta”) \
.mode(“overwrite”) \
.save(“departureDelays.delta”)
注意,這種方法類似於儲存 Parquet 資料的常用方式。 現在您將指定格式(“delta”)而不是指定格式(“parquet”)。如果要檢視基礎檔案系統,您會注意到為 Delta Lake 的離港延遲表建立了四個檔案。
/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
Part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
現在,讓我們重新載入資料,但是這次我們的資料格式將由 Delta Lake 支援。
# Load flight delay data in Delta Lake format
delays_delta = spark \
.read \
.format(“delta”) \
.load(“departureDelays.delta”)
# Create temporary view
delays_delta.createOrReplaceTempView(“delays_delta”)
# How many flights are between Seattle and San Francisco
spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
執行結果:
最後,我們確定了從西雅圖飛往舊金山的航班數量;在此資料集中,有1698個航班。
立馬轉換到 Delta Lake
如果您有現成的 Parquet 表,則可以將它們轉換為 Delta Lake 格式,從而無需重寫表。 如果要轉換表,可以執行以下命令。
from delta.tables import *
# Convert non partitioned parquet table at path ‘/path/to/table’
deltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/ table`”)
# Convert partitioned parquet table at path ‘/path/to/table’ and
partitioned by integer column named ‘part’
partitionedDeltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/table`”, “part int”)
刪除我們的航班資料
要從傳統的資料湖表中刪除資料,您將需要:
- 從表中選擇所有資料,排除要刪除的行
- 根據上面的查詢建立一個新表
- 刪除原始表
- 將新表重新命名為原始表名,以獲取下游依賴關係來代替執行所有這些步驟。使用 Delta Lake,我們可以通過執行 DELETE 語句來簡化此過程。 為了展示這一點,讓我們刪除所有早點或準點抵達的航班(即,延誤<0)。
from delta.tables import *
from pyspark.sql.functions import *
# Access the Delta Lake table
deltaTable = DeltaTable.forPath(spark, pathToEventsTable )
# Delete all on-time and early flights
deltaTable.delete(“delay < 0”)
# How many flights are between Seattle and San Francisco
spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()
從上面的查詢中可以看到,我們刪除了所有準時航班和早班航班(更多資訊,請參見下文),從西雅圖到舊金山的航班有837班延誤。 如果您檢視檔案系統,會注意到即使刪除了一些資料,還是有更多檔案。
/departureDelays.delta$ ls -l
_delta_log
part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet
在傳統的資料湖中,刪除是通過重寫整個表(不包括要刪除的值)來執行的。 使用 Delta Lake,可以通過有選擇地寫入包含要刪除資料的檔案的新版本來執行刪除操作,同時僅將以前的檔案標記為已刪除。 這是因為 Delta Lake 使用多版本併發控制(MVCC)對錶執行原子操作:例如,當一個使用者正在刪除資料時,另一使用者可能正在查詢之前的版本。這種多版本模型還使我們能夠回溯時間(即 time travel)並查詢以前的版本,這個功能稍後我們將看到。
更新我們的航班資料
要更新傳統資料湖表中的資料,您需要:
- 從表中選擇所有資料,不包括想要修改的行。
- 修改需要更新/更改的行
- 合併這兩個表以建立一個新表
- 刪除原始表
- 將新表重新命名為原始表名,以實現下游依賴
代替上面的步驟,使用 Delta Lake 我們可以通過執行 UPDATE 語句來簡化此過程。 為了顯示這一點,讓我們更新所有從底特律到西雅圖的航班。
# Update all flights originating from Detroit to now be
originating from Seattle
deltaTable.update(“origin = ‘DTW’”, { “origin”: “’SEA’” } )
# How many flights are between Seattle and San Francisco
spark.sql(“select count(1) from delays_delta where origin = ‘SEA’
and destination = ‘SFO’”).show()
如今底特律航班已被標記為西雅圖航班,現在我們有986航班從西雅圖飛往舊金山。如果您要列出您的離崗延遲檔案系統(即 $ ../departureDelays/ls -l),您會注意到現在有11個檔案(而不是刪除檔案後的8個檔案和表建立後的4個檔案)。
合併我們的航班資料
使用資料湖時,常見的情況是將資料連續追加到表中。這通常會導致資料重複(您不想再次將其插入表中),需要插入的新行以及一些需要更新的行。 使用 Delta Lake,所有這些都可以通過使用合併操作(類似於 SQL MERGE 語句)來實現。
讓我們從一個樣本資料集開始,您將通過以下查詢對其進行更新,插入或刪除重複資料。
# What flights between SEA and SFO for these date periods
spark.sql(“select * from delays_delta where origin = ‘SEA’ and
destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
該查詢的輸出如下表所示。 請注意,已新增顏色編碼以清楚地標識哪些行是已刪除的重複資料(藍色),已更新的資料(黃色)和已插入的資料(綠色)。
接下來,讓我們生成自己的 merge_table,其中包含將插入,更新或刪除重複的資料。具體看以下程式碼段
items = [(1010710, 31, 590, ‘SEA’, ‘SFO’), (1010521, 10, 590, ‘SEA’, ‘SFO’),
(1010822, 31, 590, ‘SEA’, ‘SFO’)]
cols = [‘date’, ‘delay’, ‘distance’, ‘origin’, ‘destination’]
merge_table = spark.createDataFrame(items, cols)
merge_table.toPandas()
在上表(merge_table)中,有三行不同的日期值:
- 1010521:此行需要使用新的延遲值(黃色)更新排期表。
- 1010710:此行是重複的(藍色)
- 1010832:這是要插入的新行(綠色)
使用 Delta Lake,可以通過合併語句輕鬆實現,具體看下面程式碼片段。
# Merge merge_table with flights
deltaTable.alias(“flights”) \
.merge(merge_table.alias(“updates”),”flights.date = updates.date”) \
.whenMatchedUpdate(set = { “delay” : “updates.delay” } ) \ .whenNotMatchedInsertAll() \
.execute()
# What flights between SEA and SFO for these date periods
spark.sql(“select * from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()
一條語句即可有效完成刪除重複資料,更新和插入這三個操作。
檢視資料表歷史記錄
如前所述,在我們進行每個事務(刪除,更新)之後,在檔案系統中建立了更多檔案。 這是因為對於每個事務,都有不同版本的 Delta Lake 表。
這可以通過使用 DeltaTable.history() 方法看到,如下所示。
注意,您還可以使用 SQL 執行相同的任務:
spark.sql(“DESCRIBE HISTORY ‘” + pathToEventsTable + “’”).show()
如您所見,對於每個操作(建立表,刪除和更新),都有三行代表表的不同版本(以下為簡化版本,以幫助簡化閱讀):
回溯資料錶的歷史
藉助 Time Travel,您可以檢視帶有版本或時間戳的 Delta Lake 表。要檢視歷史資料,請指定版本或時間戳選項。 在以下程式碼段中,我們將指定版本選項。
# Load DataFrames for each version
dfv0 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(“departureDelays.delta”)
dfv1 = spark.read.format(“delta”).option(“versionAsOf”, 1).load(“departureDelays.delta”)
dfv2 = spark.read.format(“delta”).option(“versionAsOf”, 2).load(“departureDelays.delta”)
# Calculate the SEA to SFO flight counts for each version of history
cnt0 = dfv0.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()
cnt1 = dfv1.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()
cnt2 = dfv2.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()
# Print out the value
print(“SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s” % (cnt0, cnt1, cnt2))
## Output
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
無論是用於治理,風險管理,合規(GRC)還是錯誤時進行回滾,Delta Lake 表都包含元資料(例如,記錄操作員刪除的事實)和資料(例如,實際刪除的行)。但是出於合規性或大小原因,我們如何刪除資料檔案?
使用 vacuum 清理舊版本的資料表
預設情況下,Delta Lake vacuum 方法將刪除所有超過7天參考時間的行(和檔案)。如果要檢視檔案系統,您會注意到表的11個檔案。
/departureDelays.delta$ ls -l _delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
Part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
要刪除所有檔案,以便僅保留當前資料快照,您可以 vacuum 方法指定一個較小的值(而不是預設保留7天)。
# Remove all files older than 0 hours old.
deltaTable.vacuum(0)
Note, you perform the same task via SQL syntax: ̧
# Remove all files older than 0 hours old
spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)
清理完成後,當您檢視檔案系統時,由於歷史資料已被刪除,您會看到更少的檔案。
/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
請注意,執行 vacuum 之後,回溯到比保留期更早的版本的功能將會失效。
Chapter-03 大型資料湖的 Time Travel 功能
Delta Lake 提供 Time Travel 功能。 Delta Lake 是一個開源儲存層,可為資料湖帶來可靠性。 Delta Lake 提供 ACID 事務,可伸縮的元資料處理,以及批流一體資料處理。 Delta Lake 在您現有的資料湖之上執行,並且與 Apache Spark API 完全相容。
使用此功能,Delta Lake 會自動對您儲存在資料湖中的大資料進行版本控制,同時您可以訪問該資料的任何歷史版本。這種臨時資料管理可以簡化您的資料管道,包括簡化稽核,在誤寫入或刪除的情況下回滾資料以及重現實驗和報告。
您的組織最終可以在一個乾淨,集中化,版本化的雲上大資料儲存庫上實現標準化,以此進行分析。
更改資料的常見挑戰
- 稽核資料更改:稽核資料更改對於資料合規性以及簡單的除錯(以瞭解資料如何隨時間變化)都至關重要。在這種情況下,傳統資料系統都轉向大資料技術和雲服務。
- 重現實驗和報告:在模型訓練期間,資料科學家對給定的資料集執行不同引數的各種實驗。當科學家在一段時間後重新訪問實驗以重現模型時,通常源資料已被上游管道修改。很多時候他們不知道這些上游資料發生了更改,因此很難重現他們的實驗。一些科學家和最好的工程師通過建立資料的多個副本來進行實踐,從而增加了儲存量的費用。對於生成報告的分析師而言,情況也是如此。
- 回滾:資料管道有時會向下遊消費者寫入髒資料。發生這種情況的原因可能是基礎架構不穩定或者混亂的資料或者管道中的 Bug 等問題。對目錄或表進行簡單追加的管道,可以通過基於日期的分割槽輕鬆完成回滾。隨著更新和刪除,這可能變得非常複雜,資料工程師通常必須設計複雜的管道來應對這種情況。
使用Time Travel功能
Delta Lake 的 time travel 功能簡化了上述用例的資料管道構建。Delta Lake 中的 Time Travel 極大地提高了開發人員的生產力。它有助於:
- 資料科學家可以更好地管理實驗
- 資料工程師簡化了管道同時可以回滾髒資料
- 資料分析師可以輕鬆地分析報告
企業最終可以在乾淨,集中化,版本化的雲端儲存中的大資料儲存庫上建立標準化,在此基礎上進行資料分析。我們很高興看到您將能夠使用此功能完成工作。
當您寫入 Delta Lake 表或目錄時,每個操作都會自動進行版本控制。您可以通過兩種不同的方式訪問資料的不同版本:
使用時間戳
Scala 語法
您可以將時間戳或日期字串作為 DataFrame 閱讀器的選項來提供:
val df = spark.read
.format(“delta”) .
option(“timestampAsOf”, “2019-01-01”)
.load(“/path/to/my/table”)
df = spark.read \
.format(“delta”) \
.option(“timestampAsOf”, “2019-01-01”) \
.load(“/path/to/my/table”)
SQL語法
SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01”
SELECT count(*) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01 01:30:00.000”
如果您無權訪問閱讀器的程式碼庫,您可以將輸入引數傳遞給該庫以讀取資料,通過將 yyyyMMddHHmmssSSS 格式的時間戳傳遞給表來進行資料回滾:
val inputPath = “/path/to/my/table@20190101000000000”
val df = loadData(inputPath)
// Function in a library that you don’t have access to def loadData(inputPath : String) : DataFrame = {
spark.read
.format(“delta”)
.load(inputPath)
}
inputPath = “/path/to/my/table@20190101000000000”
df = loadData(inputPath)
# Function in a library that you don’t have access to
def loadData(inputPath):
return spark.read \
.format(“delta”) \
.load(inputPath)
}
使用版本號
在 Delta Lake 中,每次寫入都有一個版本號,您也可以使用該版本號來進行回溯。
Scala語法
val df = spark.read
.format(“delta”)
.option(“versionAsOf”, “5238”)
.load(“/path/to/my/table”)
val df = spark.read
.format(“delta”)
.load(“/path/to/my/table@v5238”)
Python語法
df = spark.read \
.format(“delta”) \
.option(“versionAsOf”, “5238”) \
.load(“/path/to/my/table”)
df = spark.read \
.format(“delta”) \
.load(“/path/to/my/table@v5238”)
SQL語法
SELECT count(*) FROM my_table VERSION AS OF 5238
稽核資料變更
您可以使用 DESCRIBE HISTORY 命令或通過 UI 來查看錶更改的歷史記錄。
重做實驗和報告
Time travel 在機器學習和資料科學中也起著重要作用。模型和實驗的可重複性是資料科學家的關鍵考慮因素,因為他們通常在投入生產之前會建立數百個模型,並且在那個耗時的過程中,有可能想回到之前早期的模型。 但是由於資料管理通常與資料科學工具是分開的,因此確實很難實現。
Databricks 將 Delta Lake 的 Time Travel 功能與 MLflow(機器學習生命週期的開源平臺)相整合來解決可重複實驗的問題。 為了重新進行機器學習培訓,您只需將帶有時間戳的 URL 路徑作為 MLflow 引數來跟蹤每個訓練作業的資料版本。
這使您可以返回到較早的設定和資料集以重現較早的模型。 您無需與上游團隊就資料進行協調,也不必擔心為不同的實驗克隆資料。 這就是統一分析的力量,資料科學與資料工程緊密結合在一起。
回滾
Time travel 可以在產生髒資料的情況下方便回滾。 例如,如果您的 GDPR 管道作業有一個意外刪除使用者資訊的 bug,您可以用下面方法輕鬆修復管道:
INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111
You can also fix incorrect updates as follows:
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
如果您只想回滾到表的之前版本,則可以使用以下任一命令來完成:
RESTORE TABLE my_table VERSION AS OF [version_number]
RESTORE TABLE my_table TIMESTAMP AS OF [timestamp]
固定檢視的不斷更新跨多個下游作業的 Delta Lake 表
通過 AS OF 查詢,您現在可以為多個下游作業固定不斷更新的 Delta Lake 表的快照。考慮一種情況,其中 Delta Lake 表正在不斷更新,例如每15秒更新一次,並且有一個下游作業會定期從此 Delta Lake 表中讀取資料並更新不同的目標表。 在這種情況下,通常需要一個源 Delta Lake 表的一致檢視,以便所有目標表都反映相同的狀態。
現在,您可以按照下面的方式輕鬆處理這種情況:
version = spark.sql(“SELECT max(version) FROM (DESCRIBE HISTORY my_table)”).collect()
# Will use the latest version of the table for all operations below
data = spark.table(“my_table@v%s” % version[0][0]
data.where(“event_type = e1”).write.jdbc(“table1”)
data.where(“event_type = e2”).write.jdbc(“table2”)
...
data.where(“event_type = e10”).write.jdbc(“table10”)
時間序列分析查詢變得簡單
Time travel 還簡化了時間序列分析。例如,如果您想了解上週添加了多少新客戶,則查詢可能是一個非常簡單的方式,如下所示:
SELECT count(distinct userId) - (
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
FROM my_table
Chapter-04 輕鬆克隆您的 Delta Lake 以方便測試,資料共享以及重複進行機器學習
Delta Lake 有一個表克隆的功能,可以輕鬆進行測試,共享和重新建立表以實現 ML 的多次訓練。在資料湖或資料倉庫中建立表的副本有幾種實際用途。但是考慮到資料湖中表的資料量及其增長速度,進行表的物理副本是一項昂貴的操作。
藉助表克隆,Delta Lake 現在使該過程更簡單且更省成本。
什麼是克隆?
克隆是源表在給定時間點的副本。它們具有與源表相同的元資料:相同表結構,約束,列描述,統計資訊和分割槽。但是它們是一個單獨的表,具有單獨的體系或歷史記錄。對克隆所做的任何更改只會影響克隆表,而不會影響源表。由於快照隔離,在克隆過程中或之後發生的源表更改也不會反映到克隆表中。在 Delta Lake 中,我們有兩種克隆方式:淺克隆或深克隆。
淺克隆
淺克隆(也稱為零拷貝)僅複製要克隆的表的元資料;表本身的資料檔案不會被複制。這種型別的克隆不會建立資料的另一物理副本,從而將儲存成本降至最低。淺克隆很便宜,而且建立起來非常快。
這些克隆表自己不作為資料來源,而是依賴於它們的原始檔作為資料來源。如果刪除了克隆表所依賴的原始檔,例如使用 VACUUM,則淺克隆可能會變得不可用。因此,淺克隆通常用於短期使用案例,例如測試和實驗。
深克隆
淺克隆非常適合短暫的用例,但某些情況下需要表資料的獨立副本。深克隆會複製源表的元資料和資料檔案全部資訊。從這個意義上講,它的功能類似於使用 CTAS 命令(CREATE TABLE .. AS ... SELECT ...)進行復制。但是由於它可以按指定版本複製原始表,因此複製起來更簡單,同時您無需像使用 CTAS 一樣重新指定分割槽,約束和其他資訊。此外它更快,更健壯,也可以針對故障使用增量方式進行工作。
使用深克隆,我們將複製額外的元資料,例如 streaming 應用程式事務和 COPY INTO 事務。因此您可以在深克隆之後繼續執行 ETL 應用程式。
克隆的適用場景?
有時候我希望有一個克隆人來幫助我做家務或魔術。但是我們這裡不是在談論人類克隆。在許多情況下,您需要資料集的副本-用於探索,共享或測試 ML 模型或分析查詢。以下是一些客戶用例的示例。
用生產表進行測試和試驗
當用戶需要測試其資料管道的新版本時,他們通常依賴一些測試資料集,這些測試資料跟其生產環境中的資料還是有很大不同。資料團隊可能也想嘗試各種索引技術,以提高針對海量表的查詢效能。這些實驗和測試想在生產環境進行,就得冒影響線上資料和使用者的風險。
為測試或開發環境拷貝線上資料表可能需要花費數小時甚至數天的時間。此外,開發環境儲存所有重複的資料會產生額外的儲存成本-設定反映生產資料的測試環境會產生很大的開銷。 對於淺克隆,這是微不足道的:
-- SQL
CREATE TABLE delta.`/some/test/location` SHALLOW CLONE prod.events
# Python
DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=True)
// Scala
DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=true)
在幾秒鐘內建立完表的淺克隆之後,您可以開始執行管道的副本以測試新程式碼,或者嘗試在不同維度上優化表,可以看到查詢效能提高了很多很多。 這些更改只會影響您的淺克隆,而不會影響原始表。
暫存對生產表的重大更改
有時,您可能需要對生產表進行一些重大更改。 這些更改可能包含許多步驟,並且您不希望其他使用者看到您所做的更改,直到您完成所有工作。 淺克隆可以在這裡為您提供幫助:
-- SQL
CREATE TABLE temp.staged_changes SHALLOW CLONE prod.events;
DELETE FROM temp.staged_changes WHERE event_id is null;
UPDATE temp.staged_changes SET change_date = current_date()
WHERE change_date is null;
...
-- Perform your verifications
對結果滿意後,您有兩種選擇。 如果未對源表進行任何更改,則可以用克隆替換源表。如果對源表進行了更改,則可以將更改合併到源表中。
-- If no changes have been made to the source
REPLACE TABLE prod.events CLONE temp.staged_changes;
-- If the source table has changed
MERGE INTO prod.events USING temp.staged_changes
ON events.event_id <=> staged_changes.event_id
WHEN MATCHED THEN UPDATE SET *;
-- Drop the staged table
DROP TABLE temp.staged_changes;
機器學習結果的可重複性
訓練出有效的 ML 模型是一個反覆的過程。在調整模型不同部分的過程中,資料科學家需要根據固定的資料集來評估模型的準確性。
這是很難做到的,特別是在資料不斷被載入或更新的系統中。 在訓練和測試模型時需要一個數據快照。 此快照支援了 ML 模型的重複訓練和模型治理。
我們建議利用 Time Travel 在一個快照上執行多個實驗;在 Machine Learning Data Lineage With MLflow and Delta Lake 中可以看到一個實際的例子。
當您對結果感到滿意並希望將資料存檔以供以後檢索時(例如,下一個黑色星期五),可以使用深克隆來簡化歸檔過程。 MLflow 與 Delta Lake 的整合非常好,並且自動記錄功能(mlflow.spark.autolog()方法)將告訴您使用哪個資料表版本進行了一組實驗。
# Run your ML workloads using Python and then
DeltaTable.forName(spark, “feature_store”).cloneAtVersion(128, “feature_ store_bf2020”)
資料遷移
出於效能或管理方面的原因,可能需要將大量表移至新的專用儲存系統。原始表將不再接收新的更新,並且將在以後的某個時間點停用和刪除。深度克隆使海量表的複製更加健壯和可擴充套件。
-- SQL
CREATE TABLE delta.`zz://my-new-bucket/events` CLONE prod.events;
ALTER TABLE prod.events SET LOCATION ‘zz://my-new-bucket/events’;
由於藉助深克隆,我們複製了流應用程式事務和 COPY INTO 事務,因此您可以從遷移後停止的確切位置繼續ETL應用程式!
資料共享
在一個組織中,來自不同部門的使用者通常都在尋找可用於豐富其分析或模型的資料集。您可能希望與組織中的其他使用者共享資料。 但不是建立複雜的管道將資料移動到另一個裡,而是建立相關資料集的副本通常更加容易和經濟。這些副本以供使用者瀏覽和測試資料來確認其是否適合他們的需求而不影響您自己生產系統的資料。在這裡深克隆再次起到關鍵作用。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE data_science.events CLONE prod.events;
資料存檔
出於監管或存檔的目的,表中的所有資料需要保留一定的年限,而活動表則將資料保留幾個月。如果您希望儘快更新資料,但又要求將資料儲存幾年,那麼將這些資料儲存在一個表中並進行 time travel 可能會變得非常昂貴。
在這種情況下,每天,每週,每月歸檔資料是一個更好的解決方案。深克隆的增量克隆功能將在這裡為您提供真正的幫助。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE archive.events CLONE prod.events;
請注意,與源表相比此表將具有獨立的歷史記錄,因此根據您的存檔頻率,源表和克隆表上的 time travel 查詢可能會返回不同的結果。
看起來真棒!有問題嗎?
這裡只是重申上述一些陷阱,請注意以下幾點:
- 克隆是在你的快照上進行的。對克隆開始後的源表變化不會反映在克隆中。
- 淺克隆不像深克隆那樣是自包含的表。如果在源表中刪除了資料(例如通過 VACUUM),那麼您的淺克隆可能無法使用。
- 克隆與源表具有獨立的歷史記錄。在源表和克隆表上的 time travel 查詢可能不會返回相同的結果。
- 淺克隆不復制流事務或將副本複製到元資料。使用深層克隆來遷移表,可以從上次暫停的地方繼續進行 ETL 處理。
我該如何使用?
淺克隆和深克隆支援資料團隊在測試和管理其新型雲資料湖和倉庫如何開展新功能。表克隆可以幫助您的團隊對其管道實施生產級別的測試,微調索引以實現最佳查詢效能,建立表副本以進行共享-所有這些都以最小的開銷和費用實現。如果您的組織需要這樣做,我們希望您能嘗試克隆表並提供反饋意見-我們期待聽到您將來的新用例和擴充套件。
Chapter-05 在 Apache Spark 3.0 上的 Delta Lake 中啟用 Spark SQL DDL 和 DML 功能
Delta Lake 0.7.0 的釋出與 Apache Spark 3.0 的釋出相吻合,從而啟用了一組新功能,這些功能使用了 Delta Lake 的 SQL 功能進行了簡化。以下是一些關鍵功能。
在 Hive Metastore 中定義表支援 SQL DDL 命令
現在,您可以在 Hive Metastore 中定義 Delta 表,並在建立(或替換)表時在所有 SQL 操作中使用表名。
建立或替換表
-- Create table in the metastore
CREATE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION ‘/delta/events’
-- If a table with the same name already exists, the table is replaced with
the new configuration, else it is created
CREATE OR REPLACE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING DELTA
PARTITIONED BY (date)
LOCATION ‘/delta/events’
顯式更改表架構
-- Alter table and schema
ALTER TABLE table_name ADD COLUMNS (
col_name data_type
[COMMENT col_comment]
[FIRST|AFTER colA_name],
...)
您還可以使用 Scala / Java / Python API:
- DataFrame.saveAsTable(tableName) 和 DataFrameWriterV2 APIs。
- DeltaTable.forName(tableName) 這個 API 用於建立 io.delta.tables.DeltaTable 例項,對於在 Scala/Java/Python 中執行 Update/Delete/Merge 操作是非常有用。
支援 SQL 插入,刪除,更新和合並
通過 Delta Lake Tech Talks,最常見的問題之一是何時可以在 Spark SQL 中使用 DML 操作(如刪除,更新和合並)?不用再等了,這些操作現在已經可以在 SQL 中使用了! 以下是有關如何編寫刪除,更新和合並(使用 Spark SQL 進行插入,更新,刪除和重複資料刪除操作)的示例。
-- Using append mode, you can atomically add new data to an existing
Delta table
INSERT INTO events SELECT * FROM newEvents
-- To atomically replace all of the data in a table, you can use overwrite mode
INSERT OVERWRITE events SELECT * FROM newEvents
-- Delete events
DELETE FROM events WHERE date < ‘2017-01-01’
-- Update events
UPDATE events SET eventType = ‘click’ WHERE eventType = ‘click’
-- Upsert data to a target Delta
-- table using merge
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN UPDATE
SET events.data = updates.data
WHEN NOT MATCHED THEN INSERT (date, eventId, data)
VALUES (date, eventId, data)
值得注意的是,Delta Lake 中的合併操作比標準 ANSI SQL 語法支援更高階的語法。例如,合併支援
- 刪除操作-刪除與源資料行匹配的目標。 例如,“...配對後刪除...”
- 帶有子句條件的多個匹配操作-當目標和資料行匹配時具有更大的靈活性。 例如:
...
WHEN MATCHED AND events.shouldDelete THEN DELETE
WHEN MATCHED THEN UPDATE SET events.data = updates.data
- 星形語法-用於使用名稱相似的源列來設定目標列值的簡寫。 例如:
WHEN MATCHED THEN SET *
WHEN NOT MATCHED THEN INSERT *
-- equivalent to updating/inserting with event.date = updates.date,
events.eventId = updates.eventId, event.data = updates.data
自動和增量式 Presto/Athena 清單生成
正如 Query Delta Lake Tables From Presto and Athena, Improved Operations Concurrency,andMergePerformance 文章中所述,Delta Lake 支援其他處理引擎通過 manifest 檔案來讀取 Delta Lake。manifest 檔案包含清單生成時的最新版本。如上一章所述,您將需要:
- 生成 Delta Lake 清單檔案
- 配置 Presto 或 Athena 讀取生成的清單
- 手動重新生成(更新)清單檔案
Delta Lake 0.7.0的新增功能是使用以下命令自動更新清單檔案:
ALTER TABLE delta.`pathToDeltaTable`
SET TBLPROPERTIES(
delta.compatibility.symlinkFormatManifest.enabled=true
)
通過表屬性檔案來配置表
通過使用 ALTER TABLE SET TBLPROPERTIES,您可以在表上設定表屬性,可以啟用,禁用或配置 Delta Lake 的許多功能,就像自動清單生成那樣。例如使用表屬性,您可以使用 delta.appendOnly=true 阻止 Delta 表中資料的刪除和更新。
您還可以通過以下屬性輕鬆控制 Delta Lake 表保留的歷史記錄:
- delta.logRetentionDuration:控制錶的歷史記錄(即事務日誌歷史記錄)保留的時間。預設情況下會保留30天的歷史記錄,但是您可能需要根據自己的要求(例如GDPR歷史記錄上下文)更改此值。
- delta.deletedFileRetentionDuration:控制檔案成為 VACUUM 的候選時必須在多久被刪除。預設情況下會刪除7天以上的資料檔案。
從 Delta Lake 0.7.0 開始,您可以使用 ALTER TABLE SET TBLPROPERTIES 來配置這些屬性。
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES(
delta.logRetentionDuration = “interval “
delta.deletedFileRetentionDuration = “interval “
)
在 Delta Lake 表中提交支援新增使用者定義的元資料
您可以指定自定義的字串來作為元資料,通過 Delta Lake 表操作進行的提交,也可以使用DataFrameWriter選項userMetadata,或者 SparkSession 的配置spark.databricks.delta.commitInfo。 userMetadata。
在以下示例中,我們將根據每個使用者請求從資料湖中刪除一個使用者(1xsdf1)。為確保我們將使用者的請求與刪除相關聯,我們還將 DELETE 請求 ID 新增到了 userMetadata中。
SET spark.databricks.delta.commitInfo.userMetadata={
“GDPR”:”DELETE Request 1x891jb23”
};
DELETE FROM user_table WHERE user_id = ‘1xsdf1’
當檢視使用者表(user_table)的歷史記錄操作時,可以輕鬆地在事務日誌中標識關聯的刪除請求。
其他亮點
Delta Lake 0.7.0 版本的其他亮點包括:
- 支援 Azure Data Lake Storage Gen2-Spark 3.0 已經支援 Hadoop 3.2 庫,也被 Azure Data Lake Storage Gen2 支援。
- 改進了對流式一次觸發的支援-使用 Spark 3.0,我們確保一次觸發(Trigger.Once)在單個微批處理中處理 Delta Lake 表中的所有未完成資料,即使使用 DataStreamReader 選項 maxFilesPerTriggers 速度受限。
在 AMA 期間,關於結構化流和使用 trigger.once 的問題又很多。
有關更多資訊,一些解釋此概念的有用資源包括:
- 每天執行一次流作業,可節省10倍的成本
- 超越 Lambda:引入Delta架構:特別是成本與延遲的對比
後續
您已經瞭解了 Delta Lake 及其特性,以及如何進行效能優化,本系列還包括其他內容:
- Delta Lake 技術系列-基礎和效能
- Delta Lake 技術系列-Lakehouse
- Delta Lake 技術系列-Streaming
- Delta Lake 技術系列-客戶用例(Use Case)
本文為阿里雲原創內容,未經允許不得轉載。