1. 程式人生 > 其它 >論文解析 -- Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores

論文解析 -- Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores

 

INTRODUCTION 

提出物件儲存作為資料系統的儲存層,低成本,實現存算分離

Cloud object stores such as Amazon S3 [4] and Azure Blob Storage [17] have become some of the largest and most widely used storage systems on the planet,
holding exabytes of data for millions of customers [46].

Apart from the traditional advantages of clouds services, such as pay-as-you-go billing, economies of scale, and expert management [15],
cloud object stores are especially attractive because they allow users to scale computing and storage resources separately: for example, a user can store a petabyte of data but only run a cluster to execute a query over it for a few hours. As a result, many organizations now use cloud object stores to manage large structured datasets in data warehouses and data lakes. 

 

The major open-source “big data” systems, including Apache Spark, Hive and Presto [45, 52, 42], support reading and writing to cloud object stores using file formats such as Apache Parquet and ORC [13, 12].

Commercial services including AWS Athena, Google BigQuery and Redshift Spectrum [1, 29, 39] can also query directly against these systems and these open file formats. 

對比AWS Athena, Google BigQuery and Redshift Spectrum - https://www.jianshu.com/p/fefa6c665ff9?utm_source=hacpai.com

 

提出問題,

物件儲存的核心問題兩個,

- 資料更新問題,由於物件儲存是kv儲存,無法提供跨key的一致性保障;

- 效能問題

傳統的數倉無法承載如此量級的資料,物件儲存雖然可以存放海量資料,卻缺乏一些能力,這些問題就是資料湖要去解決的。

Unfortunately, although many systems support reading and writing to cloud object stores,
achieving performant and mutable table storage

over these systems is challenging, making it difficult to implement data warehousing capabilities over them.
Unlike distributed filesystems such as HDFS [5], or custom storage engines in a DBMS, most cloud object stores are merely key-value stores, with no cross-key consistency guarantees.
Their performance characteristics also differ greatly from distributed filesystems and require special care. 

詳細的分析一些問題,

- 多物件的操作非原子的,所以有的查詢在寫,寫一半的時候,其他查詢就會讀到中間狀態,而且也無法有效回滾

- 元資料操作代價高,比如dataskip,需要讀Parquet的footer,代價要遠大於讀HDFS

The most common way to store relational datasets in cloud object stores is using columnar file formats such as Parquet and ORC, where each table is stored as a set of objects (Parquet or ORC “files”), possibly clustered into “partitions” by some fields (e.g., a separate set of objects for each date) [45].
This approach can offer acceptable performance for scan workloads as long as the object files are moderately large.
However, it creates both correctness and performance challenges for more complex workloads.
First, because multi-object updates are not atomic, there is no isolation between queries: for example, if a query needs to update multiple objects in the table (e.g., remove the records about one user across all the table’s Parquet files), readers will see partial updates as the query updates each object individually. Rolling back writes is also difficult: if an update query crashes, the table is in a corrupted state.
Second, for large tables with millions of objects, metadata operations are expensive. For example, Parquet files include footers with min/max statistics that can be used to skip reading them in selective queries.
Reading such a footer on HDFS might take a few milliseconds, but the latency of cloud object stores is so much higher that these data skipping checks can take longer than the actual query. 

In our experience working with cloud customers, these consistency and performance issues create major challenges for enterprise data teams.
Most enterprise datasets are continuously updated, so they require a solution for atomic writes; most datasets about users require table-wide updates to implement privacy policies such as GDPR compliance [27];
and even purely internal datasets may require updates to repair incorrect data, incorporate late records, etc.
Anecdotally(傳聞), in the first few years of Databricks’ cloud service (2014–2016), around half the support escalations(升級) we received were due to data corruption, consistency or performance issues due to cloud storage strategies (e.g., undoing the effect of a crashed update job, or improving the performance of a query that reads tens of thousands of objects). 

 

To address these challenges, we designed Delta Lake, an ACID table storage layer over cloud object stores that we started providing to customers in 2017 and open sourced in 2019 [26].
The core idea of Delta Lake is simple: we maintain information about which objects are part of a Delta table in an ACID manner, using a write-ahead log that is itself stored in the cloud object store.
The objects themselves are encoded in Parquet, making it easy to write connectors from engines that can already process Parquet.
This design allows clients to update multiple objects at once, replace a subset of the objects with another, etc., in a serializable manner while still achieving high parallel read and write performance from the objects themselves (similar to raw Parquet).
The log also contains metadata such as min/max statistics for each data file, enabling order of magnitude faster metadata searches than the “files in object store” approach.

meta儲存在物件儲存,事務也是使用針對物件儲存的樂觀併發控制,所以server端無狀態,可以動態拉起,完全的存算分離

Crucially, we designed Delta Lake so that all the metadata is in the underlying object store, and transactions are achieved using optimistic concurrency protocols against the object store (with some details varying by cloud provider). This means that no servers need to be running to maintain state for a Delta table; users only need to launch servers when running queries, and enjoy the benefits of separately scaling compute and storage. 

 

Contribution

Based on this transactional design, we were also able add multiple other features in Delta Lake that are not available in traditional cloud data lakes to address common customer pain points, including:

  • Time travel to let users query point-in-time snapshots or roll back erroneous updates to their data.

  • UPSERT, DELETE and MERGE operations, which efficiently rewrite the relevant objects to implement updates to archived data and compliance workflows (e.g., for GDPR [27]).

  • Efficient streaming I/O, by letting streaming jobs write small objects into the table at low latency, then transactionally coalescing them into larger objects later for performance. Fast

    “tailing” reads of the new data added to a table are also supported, so that jobs can treat a Delta table as a message bus.

  • Caching: Because the objects in a Delta table and its log are immutable, cluster nodes can safely cache them on local storage. We leverage this in the Databricks cloud service to implement a transparent SSD cache for Delta tables.

  • Data layout optimization: Our cloud service includes a feature that automatically optimizes the size of objects in a table and the clustering of data records (e.g., storing records in Z-order to achieve locality along multiple dimensions) without impacting running queries.

  • Schema evolution, allowing Delta to continue reading old Parquet files without rewriting them if a table’s schema changes.

  • Audit logging based on the transaction log. 

 

這裡提出lakehouse的正規化,據此Delta Lake可以同時替換資料湖,數倉,和流式儲存系統。

如下圖,使用Delta Lake可以節省掉訊息佇列和數倉

Together, these feature improve both the manageability and per- formance of working with data in cloud object stores, and enable a “lakehouse” paradigm that combines the key features of data warehouses and data lakes: standard DBMS management functions usable directly against low-cost object stores. In fact, we found that many Databricks customers could simplify their overall data architectures with Delta Lake, by replacing previously separate data lake, data warehouse and streaming storage systems with Delta tables that provide appropriate features for all these use cases.

Figure 1 shows an extreme example, where a data pipeline that includes object storage, a message queue and two data warehouses for different business intelligence teams (each running their own computing resources) is replaced with just Delta tables on object storage, using Delta’s streaming I/O and performance features to run ETL and BI. The new pipeline uses only low-cost object storage and creates fewer copies of the data, reducing both storage cost and maintenance overheads. 

MOTIVATION: CHARACTERISTICS AND CHALLENGES OF OBJECT STORES 

物件儲存是什麼?

元資料API,比如List很貴

byte-range查詢比較高效,所以相關內容內聚的儲存格式會比較高效,就是說一個range讀就可以讀到所有資料

哪怕用HDFS,替換物件儲存,Delta Lake over HDFS,仍然有如,小檔案,或跨目錄原子操作的問題

Cloud object stores, such as Amazon S3 [4] and Azure Blob Storage [17], Google Cloud Storage [30], and OpenStack Swift [38], offer a simple but easy-to-scale key-value store interface.
These systems allow users to create buckets that each store multiple objects, each of which is a binary blob ranging in size up to a few TB (for example, on S3, the limit on object sizes is 5 TB [4]).
Each object is identified by a string key. It is common to model keys after file system paths (e.g., warehouse/table1/part1.parquet),
but unlike file systems, cloud object stores do not provide cheap renames of objects or of “directories”.
Cloud object stores also provide metadata APIs, such as S3’s LIST operation [41], that can generally list the available objects in a bucket by lexicographic order of key, given a start key.
This makes it possible to efficiently list the objects in a “directory” if using file-system-style paths, by starting a LIST request at the key that represents that directory prefix (e.g., warehouse/table1/).
Unfortunately, these metadata APIs are generally expensive: for example, S3’s LIST only returns up to 1000 keys per call, and each call takes tens to hundreds of milliseconds,
so it can take minutes to list a dataset with millions of objects using a sequential implementation.

When reading an object, cloud object stores usually support byte-range requests, so it is efficient to read just a range within a large object (e.g., bytes 10,000 to 20,000).
This makes it possible to leverage storage formats that cluster commonly accessed values.

Updating objects usually requires rewriting the whole object at once. These updates can be made atomic, so that readers will either see the new object version or the old one. Some systems also support appends to an object [48].

Some cloud vendors have also implemented distributed filesystem interfaces over blob storage, such as Azure’s ADLS Gen2 [18], which over similar semantics to Hadoop’s HDFS (e.g., directories and atomic renames). Nonetheless, many of the problems that Delta Lake tackles, such as small files [36] and atomic updates across multiple directories, are still present even when using a distributed filesystem—indeed, multiple users run Delta Lake over HDFS

 

物件儲存有哪些問題?

首先是一致性,大部分物件儲存只能達到最終一致性,即一個client新寫入的object,其他的clients不一定立刻能讀到或list到。

The most popular cloud object stores provide eventual consistency for each key and no consistency guarantees across keys,
which creates challenges when managing a dataset that consists of multiple objects, as described in the Introduction.
In particular, after a client uploads a new object, other clients are not necessarily guaranteed to see the object in LIST or read operations right away.
Likewise, updates to an existing object may not immediately be visible to other clients. Moreover, depending on the object store, even the client doing a write may not immediately see the new objects.

The exact consistency model differs by cloud provider, and can be fairly complex.
As a concrete example, Amazon S3 provides read- after-write consistency for clients that write a new object, meaning that read operations such as S3’s GET will return the object contents after a PUT.
However, there is one exception: if the client writing the object issued a GET to the (nonexistent) key before its PUT,
then subsequent GETs might not read the object for a period of time, most likely because S3 employs negative caching.
Moreover, S3’s LIST operations are always eventually consistent, meaning that a LIST after a PUT might not return the new object [40].
Other cloud object stores offer stronger guarantees [31], but still lack atomic operations across multiple keys. 

還有就是效能問題,

讀效能,通過順序讀和大併發來提升

寫效能,通過控制物件大小,來平衡讀寫效能,或是通過log-structured,寫小物件,後續通過compact成大物件滿足讀效能,這裡有個問題,如果對冷資料的更新會比較低效

In our experience, achieving high throughput with object stores requires a careful balance of large sequential I/Os and parallelism.
For reads, the most granular operation available is reading a sequential byte range, as described earlier.
Each read operation usually incurs at least 5–10 ms of base latency, and can then read data at roughly 50–100 MB/s,
so an operation needs to read at least several hundred kilobytes to achieve at least half the peak throughput for sequential reads, and multiple megabytes to approach the peak throughput.
Moreover, on typical VM configurations, applications need to run multiple reads in parallel to maximize throughput.
For example, the VM types most frequently used for analytics on AWS have at least 10 Gbps network bandwidth, so they need to run 8–10 reads in parallel to fully utilize this bandwidth.

LIST operations also require significant parallelism to quickly list large sets of objects. For example, S3’s LIST operations can only return up to 1000 objects per requests, and take tens to hundreds of milliseconds, so clients need to issue hundreds of LISTs in parallel to list large buckets or “directories”. In our optimized runtime for Apache Spark in the cloud, we sometimes parallelize LIST operations over the worker nodes in the Spark cluster in addition to threads in the driver node to have them run faster.
In Delta Lake, the metadata about available objects (including their names and data statistics) is stored in the Delta log instead, but we also parallelize reads from this log over the cluster.

Write operations generally have to replace a whole object (or append to it), as discussed in Section 2.1.
This implies that if a table is expected to receive point updates, then the objects in it should be kept small, which is at odds with supporting large reads.
Alternatively, one can use a log-structured storage format. 

對於上面的效能問題,導致如下3個設計思考,

順序讀,所以用列存;讓物件大小合適,不大不小;避免List,儘量可以字典序的key range

Implications for Table Storage. The performance characteristics of object stores lead to three considerations for analytical workloads:

1. Keep frequently accessed data close-by sequentially, which generally leads to choosing columnar formats.

2. Make objects large, but not too large. Large objects increase the cost of updating data (e.g., deleting all data about one user) because they must be fully rewritten.

3. Avoid LIST operations, and make these operations request lexicographic key ranges when possible. 

 

來看下當前的Table Storage的方案,

第一種就是簡單的分目錄,不去特意管理元資料

按欄位分割槽,放到不同的目錄裡面,可以部分提升檢索效率

問題還是前面提到的,原子性,最終一致性,

還有讀效能,哪怕分割槽,找出查詢相關的物件也很慢,反覆去讀取每個Parquet或ORC檔案的元資料

Directories of Files.

The most common approach,supported by the open source big data stack as well as many cloud services, is to store the table as a collection of objects, typically in a columnar format such as Parquet.
As a refinement, the records may be “partitioned” into directories based on one or more attributes.
For example, for a table with a date field, we might create a separate directory of objects for each date, e.g., mytable/date=2020-01-01/obj1 and mytable/date=2020-01-01/obj2 for data from Jan 1st,
then mytable/date=2020-01-02/obj1 for Jan 2nd, etc, and split in- coming data into multiple objects based on this field.
Such partitioning reduces the cost of LIST operations and reads for queries that only access a few partitions. 

This approach is attractive because the table is “just a bunch of objects” that can be accessed from many tools without running any additional data stores or systems.
It originated in Apache Hive on HDFS [45] and matches working with Parquet, Hive and other big data software on filesystems.

Challenges with this Approach. As described in the Introduction, the “just a bunch of files” approach suffers from both performance and consistency problems on cloud object stores.
The most common challenges customers encountered are: 

  • No atomicity across multiple objects: Any transaction that needs to write or update multiple objects risks having partial writes visible to other clients. Moreover, if such a transaction fails, data is left in a corrupt state.

  • Eventual consistency: Even with successful transactions, clients may see some of the updated objects but not others.

  • Poor performance: Listing objects to find the ones relevant for a query is expensive, even if they are partitioned into directories by a key. Moreover, accessing per-object statistics stored in Parquet or ORC files is expensive because it requires additional high-latency reads for each feature.

  • No management functionality: The object store does not implement standard utilities such as table versioning or audit logs that are familiar from data warehouses. 

 

第二種方式是,採用專門的服務來管理元資料,這裡提到Snowflake,sf是用foundationDB來管理元資料的,這裡沒有明說;

這個方案的問題是,

首先所有的操作需要訪問這個元資料服務,容易單點和形成瓶頸。

開發connector的成本比較高。強依賴服務提供者。

這裡說的理由比較牽強,場景不同,對於sf這種查詢延時敏感的場景,使用高效能DB來管理meta明顯更為合理。

同樣Hive ACID也是通過一個MetaStore,比如mysql,來管理meta,這樣很容易產生效能瓶頸。

Custom Storage Engines.

“Closed-world” storage engines built for the cloud, such as the Snowflake data warehouse [23],
can bypass many of the consistency challenges with cloud object stores by managing metadata themselves in a separate, strongly consistent service,
which holds the “source of truth” about what objects comprise a table.
In these engines, the cloud object store can be treated as a dumb block device and standard techniques can be used to implement efficient metadata storage, search, updates, etc. over the cloud objects.
However, this approach requires running a highly available service to manage the metadata, which can be expensive,
can add overhead when querying the data with an external computing engine, and can lock users into one provider.

Challenges with this Approach.
Despite the benefits of a clean-slate “closed-world” design, some specific challenges we encountered with this approach are:

  • All I/O operations to a table need contact the meta data service, which can increase its resource cost and reduce performance and availability. For example, when accessing a Snowflake dataset in Spark, the reads from Snowflake’s Spark connector stream data through Snowflake’s services, reducing performance compared to direct reads from cloud object stores.

  • Connectors to existing computing engines require more engineering work to implement than an approach that reuses existing open formats such as Parquet. In our experience, data teams wish to use a wide range of computing engines on their data (e.g. Spark, TensorFlow, PyTorch and others), so making connectors easy to implement is important.

  • The proprietary metadata service ties users to a specific service provider, whereas an approach based on directly accessing objects in cloud storage enables users to always access their data using different technologies. 

Apache Hive ACID [32] implements a similar approach over HDFS or object stores by using the Hive Metastore (a transactional RDBMS such as MySQL) to keep track of multiple files that hold updates for a table stored in ORC format. However, this approach is limited by the performance of the metastore, which can become a bottleneck for tables with millions of objects in our experience. 

 

第三種方式就是,DeltaLake使用的將transaction log和metadata儲存在物件儲存中,資料以Parquet的形式存放;雖然Hudi和Iceberg也是採用同樣的設計,但是DeltaLake具備一些獨有的功能。

Metadata in ObjectStores.

DeltaLake’s approach is to store a transaction log and metadata directly within the cloud object store, and use a set of protocols over object store operations to achieve serializability.
The data within a table is then stored in Parquet format, making it easy to access from any software that already supports Parquet as long as a minimal connector is available to discover the set of objects to read.1 Although we believe that Delta Lake was the first system to use this design (starting in 2016), two other software packages also support it now — Apache Hudi [8] and Apache Iceberg [10].
Delta Lake offers a number of unique features not supported by these systems, such as Z-order clustering, caching, and background optimization.
We discuss the similarities and differences between these systems in more detail in Section 8. 

 

DELTA LAKE STORAGE FORMAT AND ACCESS PROTOCOLS 

那麼首先來看下,儲存格式,

首先Table是在一個目錄下,如mytable

類似Hive的資料分割槽,這裡是按date分割槽成子目錄

1. 資料物件是用parquet格式,好處如下,沒有選ORC,是因為Parquet對於Spark有更好的支援

We chose Parquet as our underlying data format because it was column-oriented, offered diverse compression updates,
supported nested data types for semi-structured data, and already had performant implementations in many engines.
Building on an existing, open file format also ensured that Delta Lake can continue to take advantage of newly released updates to Parquet libraries and simplified developing connectors to other engines (Section 4.8). Other open-source formats, such as ORC [12], would likely have worked similarly, but Parquet had the most mature support in Spark. 

資料物件有唯一的名字,往往是writer生成的GUID。資料物件作為table version的一部分,這個記錄在transaction log中。

Each data object in Delta has a unique name, typically chosen by the writer by generating a GUID. However, which objects are part of each version of the table is determined by the transaction log 

2. _delta_log 目錄包含事務的日誌,

包含,log檔案, checkpoint檔案和last_checkpoint標記

先看log檔案,包含一組actions,

Each log record object (e.g., 000003.json) contains an array of actions to apply to the previous version of the table in order to generate the next one.  

Actions有幾種?

Change Metadata. 修正元資料
The metaData action changes the current metadata of the table.
The first version of a table must contain a metaData action.
Subsequent metaData actions completely overwrite the current metadata of the table.
The metadata is a data structure containing the schema, partition column names (i.e., date in our example) if the column is partitioned,
the storage format of data files (typically Parquet, but this provides extensibility), and other configuration options, such as marking a table as append-only. 

Add or Remove Files.
資料的修改是通過add和remove檔案,將這些actions記錄在log中,便於找到table包含哪些檔案,比List要高效
Add,本身是包含統計資料的,這裡還涉及到統計資料更新,如果add一個已有的檔案
Remove,是lazy的,這樣在某段時間可以查到stale snapshot
dataChange flag,這個表示這個action是否真正改變了資料,比如更新統計,或resort就可以設定成false,便於後面filter出真正的datachange的actions

The add and remove actions are used to modify the data in a table by adding or removing individual data objects respectively.
Clients can thus search the log to find all added objects that have not been removed to determine the set of objects that make up the table.

The add record for a data object can also include data statistics, such as the total record count and per-column min/max values and null counts.
When an add action is encountered for a path that is already present in the table, statistics from the latest version replace that from any previous version.
This can be used to “upgrade” old tables with more types of statistics in new versions of Delta Lake.

The remove action includes a timestamp that indicates when the removal occurred.
Physical deletion of the data object can happen lazily after a user-specified retention time threshold.
This delay allows concurrent readers to continue to execute against stale snapshots of the data.
A remove action should remain in the log and any log checkpoints as a tombstone until the underlying data object has been deleted.

The dataChange flag on either add or remove actions can be set to false to indicate that this action, when combined with other actions in the same log record object,
only rearranges existing data or adds statistics. For example, streaming queries that are tailing the transaction log can use this flag to skip actions that would not affect their results,
such as changing the sort order in earlier data files. 

Protocol Evolution. 做版本相容的

The protocol action is used to increase the version of the Delta protocol that is required to read or write a given table.
We use this action to add new features to the format while indicating which clients are still compatible.

Add Provenance Information. 出處資訊

Each log record object can also include provenance information in a commitInfo action, e.g., to log which user did the operation.

Update Application Transaction IDs. 應用事務ID,用於流處理ExactlyOnce

DeltaLake also provides a means for application to include their own data inside log records, which can be useful for implementing end-to-end transactional applications.
For example, stream processing systems that write to a Delta table need to know which of their writes have previously been committed in order to achieve “exactly-once” semantics:
if the streaming job crashes, it needs to know which of its writes have previously made it into the table, so that it can replay subsequent writes starting at the correct offset in its input streams.
To support this use case, Delta Lake allows applications to write a custom txn action with appId and version fields in their log record objects that can track application-specific information,
such as the corresponding offset in the input stream in our example.
By placing this information in the same log record as the corresponding Delta add and remove operations, which is inserted into the log atomically, the application can ensure that Delta Lake adds the new data and stores its version field atomically. Each application can simply generate its appId randomly to receive a unique ID. We use this facility in the Delta Lake connector for Spark Structured Streaming [14]. 

 

3. log checkpoints

CP記錄下往前追溯的所有log的所有非冗餘的actions,以parquet格式。

log是分散的,json格式,不便於search

CP的過程中 ,會把從當前record id往前的所有log和CP進行合併,形成Metadata,裡面包含所有add的檔案,remove但為delete的檔案等,

其中還包含每個檔案的統計資料

並且是Parquet格式,便於search

最後,通過_delta_log/_last_checkpoint,可以直接找到最先的CP

For performance, it is necessary to compress the log periodically into checkpoints.
Checkpoints store all the non-redundant actions in the table’s log up to a certain log record ID, in Parquet format.
Some sets of actions are redundant and can be removed. These include:

• add actions followed by a remove action for the same data object. 
The adds can be removed because the data object is no longer part of the table.
The remove actions should be kept as a tombstone according to the table’s data retention configuration.
Specifically, clients use the timestamp in remove actions to decide when to delete an object from storage.

• Multiple adds for the same object can be replaced by the last one, because new ones can only add statistics.

• Multiple txn actions from the same appId can be replaced by the latest one, which contains its latest version field.

• The changeMetadata and protocol actions can also be coalesced to keep only the latest metadata.

The end result of the checkpointing process is therefore a Parquet file that contains an add record for each object still in the table,
remove records for objects that were deleted but need to be retained until the retention period has expired, and a small number of other records such as txn, protocol and changeMetadata.

This column-oriented file is an ideal format for querying meta-data about the table, and for finding which objects may contain data relevant for a selective query based on their data statistics.
In our experience, finding the set of objects to read for a query is nearly always faster using a Delta Lake checkpoint than using LIST operations and reading Parquet file footers on an object store.

Any client may attempt to create a checkpoint up to a given log record ID, and should write it as a .parquet file for the corresponding ID if successful.
For example, 000003.parquet would represent a checkpoint of the records up to and including 000003.json. By default, our clients write checkpoints every 10 transactions. 

Lastly, clients accessing the Delta Lake table need to efficiently find the last checkpoint (and the tail of the log) without LISTing all the objects in the _delta_log directory.
Checkpoint writers write their new checkpoint ID in the _delta_log/_last_checkpoint file if it is newer than the current ID in that file.
Note that it is fine for the _last_checkpoint file to be out of date due to eventual consistency issues with the cloud object store, because clients will still search for new checkpoints after the ID in this file. 

 

最核心的來了,訪問協議,怎麼讀寫?

讀協議

1,2 找到last checkpoint,list出所有新的logs

3 根據checkpint和logs,回放出當前table的Meta,包含統計資料

4 dataskip,找出需要讀的檔案列表

5 並行讀檔案

這裡有效的解決了一致性問題,一個log代表一個事務,根據log去找檔案,不存在部分可見的問題

 

寫協議

1. 讀到最新的record id,方法就是用讀協議的1,2步,讀到最新的log id,比如r,那麼這次寫入用r+1

2. 如果需要讀version r的資料,就讀,為啥要讀?

3. 寫入新的資料物件檔案

4. 生成新的log檔案,r+1,原子性的更新,這裡是樂觀鎖,大家都在寫可能會衝突;衝突了就+1繼續提交

5. 可選的做CP

 這裡第四部,保證寫log檔案的原子性,有幾種方法,

這裡樂觀鎖會限制事務寫入rate,但是論文裡面說實際上夠用

 

 

寫入的隔離級別

支援Snapshot隔離,這個是顯然的,你讀到一個log為止,自然就是snapshot隔離

如果要serializability,就需要執行讀寫事務,只有在一個事務裡面才能保證順序

 

HIGHER-LEVEL FEATURES IN DELTA 

Time Travel and Rollbacks 

這個天然支援,基於commit log的設計,自然有MVCC的特性,無論time travel或undo都很簡單

 

Efficient UPSERT, DELETE and MERGE 

可以一致性的更新,update,由add和remove實現

 

Streaming Ingest and Consumption 

當做資料佇列,部分替代Kafka的作用

支援compact小物件,提升對效能,compact action是事務性的,注意要設定dataChange flag

支援應用層的事務ID,保證exactly once

通過list log有效發現新的data objects,過濾dataChange flag

Write Compaction.

A simple data lake organized as a collection of objects makes it easy to insert data (just write a new object), but creates an unpleasant tradeoff between write latency and query performance.
If writers wish to add new records into a table quickly by writing small objects, readers will ultimately be slowed down due to smaller sequential reads and more metadata operations.
In contrast, Delta Lake allows users to run a background process that compacts small data objects transactionally, without affecting readers.
Setting dataChange flag to false on log records that compact files, described in Section 3.1.2, also allows streaming consumers to ignore these compaction operations altogether if they have already read the small objects. Thus, streaming applications can quickly transfer data to one another by writing small objects, while queries on old data stay fast. 

Exactly-Once Streaming Writes.

Writers can use the txn action type in log records, described in Section 3.1.2, to keep track of which data they wrote into a Delta Lake table and implement “exactly-once” writes.
In general, stream processing systems that aim to update data in an external store need some mechanism to make their writes idempotent in order to avoid duplicate writes after a failure.
This could be done by ensuring that each record has a unique key in the case of overwrites, or more generally,
by atomically updating a “last version written” record together with each write, which can then be used to only write newer changes.
Delta Lake facilitates this latter pattern by allowing applications to update an (appId, version) pair with each transaction.
We use this feature in our Structured Streaming [14] connector to support exactly-once writes for any kind of streaming computation (append, aggregation, upsert, etc). 

Efficient Log Tailing.

The final tool needed to use Delta Lake tables as message queues is a mechanism for consumers to efficiently find new writes.
Fortunately, the storage format for the log, in a series of .json objects with lexicographically increasing IDs, makes this easy:
a consumer can simply run object store LIST operations starting at the last log record ID it has seen to discover new ones.
The dataChange flag in log records allows streaming consumers to skip log records that only compact or rearrange existing data, and just read new data objects.
It is also easy for a streaming application to stop and restart at the same log record in a Delta Lake table by remembering the last record ID it finished processing. 

 

Data Layout Optimization 

介紹兩種data layout的優化,

Optimize,compacts小資料物件

Z-Ordering,解決按照單key分割槽導致按其他key查詢的效能回退,解決多個key的分割槽locality的問題

Data layout has a large effect on query performance in analytical systems, especially because many analytical queries are highly selective.
Because Delta Lake can update the data structures that represent a table transactionally, it can support a variety of layout optimizations without affecting concurrent operations.
For example, a background process could compact data objects, change the record order within these objects, or even update auxiliary data structures such as data statistics and indexes without impacting other clients. We take advantage of this property to implement a number of data layout optimization features: 

OPTIMIZE Command.
Users can manually run an OPTIMIZE command on a table that compacts small objects without affecting ongoing transactions, and computes any missing statistics.
By default, this operation aims to make each data object 1 GB in size, a value that we found suitable for many workloads, but users can customize this value. 

Z-Ordering by Multiple Attributes.
Many datasets receive highly selective queries along multiple attributes.
For example, one network security dataset that we worked with stored information about data sent on the network in as (sourceIp, destIp, time) tuples, with highly selective queries along each of these dimensions.
A simple directory partitioning scheme, as in Apache Hive [45], can help to partition the data by a few attributes once it is written, but the number of partitions becomes prohibitively large when using multiple attributes. Delta Lake supports reorganizing the records in a table in Z-order [35] along a given set of attributes to achieve high locality along multiple dimensions.
The Z-order curve is an easy-to-compute space-filling curve that creates locality in all of the specified dimensions.
It can lead to significantly better performance for query workloads that combine these dimensions in practice, as we show in Section 6.
Users can set a Z-order specification on a table and then run OPTIMIZE to move a desired subset of the data (e.g., just the newest records) into Z-ordered objects along the selected attributes.
Users can also change the order later.
Z-ordering works hand-in-hand with data statistics to let queries read less data.
In particular, Z-ordering will tend to make each data object contain a small range of the possible values in each of the chosen attributes, so that more data objects can be skipped when running a selective query. 

 

DISCUSSION AND LIMITATIONS 

DeltaLake的吸引力在於,可以同時應對多種場景,避免其他的系統的部署,並且基於雲原生的開放性

Our experience with Delta Lake shows that ACID transactions can be implemented over cloud object stores for many enterprise data processing workloads,
and that they can support large-scale streaming, batch and interactive workloads.
Delta Lake’s design is especially attractive because it does not require any other heavy- weight system to mediate access to cloud storage,
making it trivial to deploy and directly accessible from a wide range of query engines that support Parquet.
Delta Lake’s support for ACID then enables other powerful performance and management features.

限制3點,

僅支援單表事務,

延時,

不支援其他的索引,

後面兩點在我看來不是很大的問題

Nonetheless, Delta Lake’s design and the current implementation have some limits that are interesting avenues for future work.
First, Delta Lake currently only provides serializable transactions within a single table, because each table has its own transaction log.
Sharing the transaction log across multiple tables would remove this limitation, but might increase contention to append log records via optimistic concurrency.
For very high transaction volumes, a coordinator could also mediate write access to the log without being part of the read and write path for data objects.

Second, for streaming workloads, Delta Lake is limited by the latency of the underlying cloud object store.
For example, it is difficult to achieve millisecond-scale streaming latency using object store operations.
However, we found that for the large-scale enterprise workloads where users wish to run parallel jobs, latency on the order of a few seconds using Delta Lake tables was acceptable.

Third, Delta Lake does not currently support secondary indexes (other than the min- max statistics for each data object), but we have started prototyping a Bloom filter based index.
Delta’s ACID transactions allow us to update such indexes transactionally with changes to the base data. 

 

RELATED WORK 

Multiple research and industry projects have sought to adapt data management systems to a cloud environment.
For example, Brantner et al. explored building an OLTP database system over S3 [20]; bolt-on consistency [19] implements causal consistency on top of eventually consistent key-value stores;
AWS Aurora [49] is a commercial OLTP DBMS with separately scaling compute and storage layers;
and Google BigQuery [29], AWS Redshift Spectrum [39] and Snowflake [23] are OLAP DBMSes that can scale computing clusters separately from storage and can read data from cloud object stores.
Other work, such as the Relational Cloud project [22], considers how to automatically adapt DBMS engines to elastic, multi-tenant workloads. 

DL最大的不同是沒有計算層,即frontend server,clients是直接讀寫雲端儲存上的表目錄,這樣可用性和可擴充套件性會大大提升。

從效能角度看,frontend還是需要的,否則每次讀寫都去重建meta?

Delta Lake shares these works’ vision of leveraging widely available cloud infrastructure, but targets a different set of requirements.
Specifically, most previous DBMS-on-cloud-storage systems require the DBMS to mediate interactions between clients and storage (e.g., by having clients connect to an Aurora or Redshift frontend server).
This creates an additional operational burden (frontend nodes have to always be running), as well as possible scalability, availability or cost issues when streaming large amounts of data through the frontend nodes.
In contrast, we designed Delta Lake so that many, independently running clients could coordinate access to a table directly through cloud object store operations,
without a separately running service in most cases (except for a lightweight coordinator for log record IDs on S3, as described in §3.2.2).
This design makes Delta Lake operationally simple for users and ensures highly scalable reads and writes at the same cost as the underlying object store.
Moreover, the system is as highly available as the underlying cloud object store: no other components need to be hardened or restarted for disaster recovery.
Of course, this design is feasible here due to the nature of the workload that Delta Lake targets:
an OLAP workload with relatively few write transactions per second, but large transaction sizes, which works well with our optimistic concurrency approach.

和Hudi和Iceberg比,主要是feature更多,對於生態支援更好;意思是大同小異,因為這個本身實現也不復雜

The closest systems to Delta Lake’s design and goals are Apache Hudi [8] and Apache Iceberg [10], both of which define data formats and access protocols to implement transactional operations on cloud object stores. These systems were developed concurrently with Delta Lake and do not provide all its features.
For example, neither system provides data layout optimizations such as Delta Lake’s ZORDER BY (§4.4), a streaming input source that applications can use to efficiently scan new records added to a table (§4.3), or support for local caching as in the Databricks service (§4.5).
In addition, Apache Hudi only supports one writer at a time (but multiple readers) [9].
Both projects offer connectors to open source engines including Spark and Presto, but lack connectors to commercial data warehouses such as Redshift and Snowflake, which we implemented using manifest files (§4.8), and to commercial ETL tools. 

Apache Hive ACID [32] also implements transactions over object stores or distributed file systems, but it relies on the Hive metastore (running in an OLTP DBMS) to track the state of each table.
This can create a bottleneck in tables with millions of partitions, and increases users’ operational burden.
Hive ACID also lacks support for time travel (§4.1).
Low-latency stores over HDFS, such as HBase [7] and Kudu [6], can also combine small writes before writing to HDFS, but require running a separate distributed system.

和Cstore或其他的HTAP比,這個沒法比,一個是倉,一個是庫

There is a long line of work to combine high-performance transactional and analytical processing, exemplified by C-Store [43] and HTAP systems.
These systems usually have a separate writable store optimized for OLTP and a long-term store optimized for analytics.
In our work, we sought instead to support a modest transaction rate without running a separate highly available write store by designing the concurrency protocol to go directly against object stores.