初創電商公司Drop的資料湖實踐
阿新 • • 發佈:2020-03-20
## 歡迎關注微信公眾號:ApacheHudi
## 1. 引入
Drop是一個智慧的獎勵平臺,旨在通過獎勵會員在他們喜愛的品牌購物時獲得的Drop積分來提升會員的生活,同時幫助他們發現與他們生活方式產生共鳴的新品牌。實現這一體驗的核心是Drop致力於在整個公司內推廣以資料為基礎的文化,Drop的資料用於多種形式,包括但不限於商業智慧、測量實驗和構建機器學習模型。
為了確保有效地利用資料,工程團隊一直在尋找可以改善基礎架構以適應當前和未來的需求的方法,與許多其他高成長型初創公司的經驗類似,我們對資料的需求規模超過了基礎架構的能力,因此需要將以商業智慧為中心的資料基礎架構演變為可以釋放大資料需求和能力的基礎架構。
## 2. 動機
Drop在成立之初就著力構建資料基礎架構,以便利用自動報告和儀表板來觀察關鍵業務的指標。我們的第一代資料基礎架構使用以[AWS Redshift](https://aws.amazon.com/redshift/)資料倉庫為中心的架構,使用[Apache Airflow](https://airflow.apache.org/)排程自定義批處理ETL作業,使用[Looker](https://looker.com/)構建自動化儀表板和報告。
![](https://img2020.cnblogs.com/blog/1875937/202003/1875937-20200319231950625-927112597.png)
該架構實現了我們的早期目標,但是利用資料的需求已經超出了商業智慧的範圍。為向會員提供更個性化的體驗,我們需要一個支援高階功能的資料平臺,例如更深入的分析、全面的實驗測量以及機器學習模型的開發,我們也意識到原來架構的技術限制可能會阻止我們解鎖那些所需的功能,一些最突出的限制包括:
* 隨著資料規模的擴大,關鍵的ETL作業的執行時間猛增,一些作業無法在24小時內執行。
* AWS Redshift將計算與儲存混合在一起,限制了提取資料到倉庫的擴充套件能力,在Redshift中提供的資料選擇非常有限,從而偏離了其作為“中心”的位置。
* 在Redshift叢集之上構建機器學習架構效果並不理想,因為Redshift僅能處理結構化資料,並且用於填充倉庫的批處理ETL作業延遲較高。
為解決這些不足並考慮到後續的擴充套件性,我們需要繼續構建資料基礎架構。[資料湖](https://en.wikipedia.org/wiki/Data_lake)是一個集中式儲存庫,能夠儲存任何規模的非結構化和結構化資料,構建資料湖將使我們能夠解決第一代資料基礎架構的侷限性,同時允許我們保留原始架構的關鍵元件,這些元件仍然滿足後續的長期資料基礎架構計劃。
## 3. 構建資料湖
當開始構建Drop的資料湖時,我們遵循以下指導原則:
* **技術堆疊保持簡單**:作為Drop Engineering的核心理念之一,我們旨在利用現有的和經過驗證的AWS技術來簡化我們的技術棧,已有AWS經驗能夠讓我們快速製作新功能原型以及利用AWS生態系統中其他服務的整合優勢,因此繼續使用AWS技術意義重大。我們在整個資料團隊中繼續保留Python和SQL,因此無需學習用於資料提取或處理的新語言,而是使用PySpark來構建所有Apache Spark作業。
* **避免重複造輪子**:在開源和技術社群中尋找成功的資料湖實施案例和經驗,並向[Uber](https://eng.uber.com/uber-big-data-platform/)和[Airbnb](https://medium.com/airbnb-engineering/data-infrastructure-at-airbnb-8adfb34f169c)尋求有關頂層設計的思路和靈感,有關更多特定技術的知識,我們依賴於已釋出的AWS re:Invent的[演講](https://www.youtube.com/channel/UCd6MoB9NC6uYN2grvUNT-Zg),通過參加2018年的AWS re:Invent並聽取Robinhood關於他們如何實現資料湖的[故事](https://youtu.be/rlobQYMb7zY),一切都得以實現,Robinhood的故事與我們技術棧非常相似,對要解決的問題以及解決問題的理念產生了深深的共鳴,所有這些故事和資源組成了我們資料湖的最初藍圖。
* **確保可擴充套件性**:隨著我們的組織從早期的初創企業過渡到高速增長的企業,資料的規模以及利用資料的複雜性迅速增長,這意味著需要在指數增長方案中有效執行的技術,同時最小化相關技術的複雜性開銷。
## 4. 總體架構
![](https://img2020.cnblogs.com/blog/1875937/202003/1875937-20200319232008159-1678022864.png)
## 5. 資料湖
### 5.1 排程層
排程層負責管理和執行資料湖中所有工作流程,排程程式會協調資料湖中所有資料的大部分移動,利用Airflow使我們能夠通過Airflow有向無環圖(DAG)構建所有工作流和基礎架構,這也簡化了我們的工程開發和部署流程,同時為我們的資料基礎架構提供了版本控制,Apache Airflow專案還包含大量支援AWS的整合庫和示例DAG,使我們能夠快速整合和評估新技術。
### 5.2 攝入層
當從多個不同的資料來源(例如伺服器日誌,增長營銷平臺和業務運營服務)拉取資料時,資料主要是從RDS Postgres資料庫,以批處理和流處理兩種形式提取到資料湖。我們選擇[AWS Database Migration Services(DMS)](https://aws.amazon.com/dms/)來攝取這些表單,因為它與RDS Postgres進行了原生整合,並且能夠以批和流形式將資料提取到S3中,DMS複製Postgres“預寫日誌(WAL)”的資料,以“變更資料捕獲(CDC)”任務進入資料湖,並在S3上生成一分鐘分割槽的[Apache Parquet](https://en.wikipedia.org/wiki/Apache_Parquet)檔案,此為流處理方式。批處理使用DMS的“全量遷移”任務以將Postgres庫表的快照匯入S3資料湖,批處理管道由排程層通過Airflow以自定義DAG的形式進行管理,這允許我們控制批快照到S3的排程,並在批執行結束時關閉未使用的DMS資源以降低服務成本。
![](https://img2020.cnblogs.com/blog/1875937/202003/1875937-20200319232026575-706840674.png)
### 5.3 處理層
處理層負責將資料從儲存層的“原始(Raw)”部分轉換為資料湖的“列式(Columnarized)”部分中的標準化列式和分割槽結構。我們使用[Lambda架構](https://en.wikipedia.org/wiki/Lambda_architecture)來協調給定資料來源的批和流資料,這種資料處理模型使我們能夠將高質量的批量快照與一分鐘延遲流檔案相結合來生成給定資料集的最新列式版本。我們將[AWS Glue](https://aws.amazon.com/glue/)及其Data Catalog(資料目錄)用作資料湖的中央metastore管理服務,metastore包含每個資料集的元資料,例如在S3中的位置、結構定義和整體大小,也可以使用[AWS Glue Crawlers](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html)捕獲和更新該元資料,整個流程如下:
* 排程層啟動處理特定資料集Lambda架構的DAG。
* DAG將PySpark應用程式載入到S3,啟動AWS EMR叢集,並在EMR中執行PySpark應用程式。
* 來自儲存層“原始”部分的批和流資料均作為EMR Spark應用程式的輸入,最終輸出是儲存層“列式”部分中使用Lambda架構進行協調的Parquet資料集。
* 完成EMR步驟後終止EMR叢集以便降低EMR成本。
* 執行AWS Glue Crawler程式以更新AWS Glue目錄中表的元資料,該目錄元資料充當整個資料湖的中央metastore。
![](https://img2020.cnblogs.com/blog/1875937/202003/1875937-20200319232041253-1410956671.png)
### 5.4 儲存層
[S3](https://aws.amazon.com/s3/)作為儲存平臺的原因有以下三個:易用性、高可靠性和相對低成本。資料儲存在儲存層中的“原始”和“列式”兩部分都基於S3,而“數倉(Warehouse)”中的資料位於Redshift叢集中。資料湖中的所有資料首先通過攝入層以各種格式進入“Raw”部分,為了使資料儲存到“列式”部分中,資料會通過處理層轉換為“列式”部分,並遵守分割槽標準。我們選擇Apache Parquet作為標準列式檔案格式,因為Apache Parquet廣泛用於資料處理服務中,並且具有效能和儲存優勢。儲存層的“數倉”部分由Redshift資料倉庫組成,其資料來自“原始”和“列式”部分。還可以通過資料“熱度”對儲存層的各部分進行分類,類比於資料的訪問頻率,“原始”資料資料最冷,“列式”資料比“原始”資料更熱,而“數倉”資料是最熱的資料,因為它經常會被自動化儀表板和報告訪問。S3的另一個優勢是我們能夠根據資料規模控制成本,對於較冷的資料,我們可以更變S3儲存型別或者將資料遷移到[AWS Glacier](https://aws.amazon.com/glacier/)以降低成本。
### 5.5 從資料湖訪問資料
技術團隊和非技術團隊都會使用資料湖,以便更好地為決策提供依據並增強整體產品體驗,非技術團隊通常通過商業智慧平臺Looker來訪問資料,以自動化儀表板和報告的形式監控各項指標。
技術團隊應用範圍更廣,從探索性分析中的即席查詢到開發機器學習模型和管道。可以通過[AWS Athena](https://aws.amazon.com/athena/)託管的[Presto](https://prestodb.io/)服務或通過Redshift資料倉庫直接查詢資料湖中的資料。AWS Athena還與AWS Glue的資料目錄整合,這使Athena可以完全訪問我們的中央metastore,這種原生整合使得Athena可以充當S3中儲存資料的主要查詢服務。我們還能夠利用[Redshift Spectrum](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum.html)查詢S3資料,這在即席查詢場景下特別有用,我們也希望使用S3的資料來豐富Redshift叢集的資料。為了對機器學習模型進行更正式的探索性分析和開發,需要結合使用Spark和Glue目錄來查詢或直接與S3中的物件進行互動。
### 5.6 檢查與監控
確保資料湖的消費者完全信任資料的準確性至關重要,我們建立了一套全面的檢查和監控程式以提醒我們任何意外情況。我們通過Airflow DAG構建的絕大多數資料湖流程,並且能夠使用Datadog監視Airflow作業失敗,並通過[PagerDuty](https://www.pagerduty.com/platform/)將關鍵問題轉發給工程師。為了建立資料質量檢查,我們還構建了一系列Airflow DAG,以生成特定資料集的自定義資料驗證指標,我們在給定資料集上執行的一組標準的驗證檢查和指標包括但不限於空檢查、行計數檢查和資料延遲指標,這些指標都會被推送到Datadog,這樣便可以在其中監視異常並在必要時向參與者發出告警。
![](https://img2020.cnblogs.com/blog/1875937/202003/1875937-20200319232055004-1603964306.png)
## 6. 當前情況
Drop的資料湖現在每天從Postgres資料庫中提取十億條記錄,並處理TB級的作業,該資料湖已被眾多團隊採用,併為增強了資料分析能力以及開發和交付機器學習模型的能力。總體而言,該架構效果不錯,並且我們相信資料基礎架構的發展將使我們處於更好的位置,以適應當前和未來的需求。
## 7. 重點總結
我們的實施過程踩過不少坑,總結如下:
* **提取列式的資料的好處**:儘管儲存層的“原始”部分與檔案格式無關,但是以列式提取資料對下游更有利,當從Postgres表中攝取資料時,這些表包含冗長和複雜文字Blob的列(例如jsonb列),下游解析資料就成了噩夢,進一步依賴[serDe](https://cwiki.apache.org/confluence/display/Hive/SerDe)檔案來協助解析資料只會增加整體複雜性,由於Parquet的固有特性,將DMS配置為以Parquet格式輸出檔案可保持列的架構完整性,並且我們的下游處理層作業也可提升速度和資料質量效能,這種折衷是以增加DMS生成parquet檔案所需的記憶體資源為代價的。
* **技術棧使用Spark**:我們的工程團隊以前在內部Hadoop方面的經驗非常有限,因此採用Spark帶來了很多麻煩。儘管有大量可用的Spark資源,但我們最大的痛點還是資源管理和錯誤排除,我們選擇EMR而不是AWS Gule構建ETL作業,這會提交已利用資源的控制級別,提升了效能和節約成本節約,但代價是複雜性的提升。為每個Spark作業配置最佳EMR資源非常複雜,我們很快了解到這些優化工作需要浪費高昂的時間。當EMR作業失敗時,我們也很難解決根源問題,在很多情況下,EMR群集生成的錯誤日誌不夠細緻,並且有時也掩蓋了重要細節。通過改進日誌記錄過程並直接引用Spark執行程式日誌,我們能夠發現更詳細的資訊,從而更快發現根本原因。
* **關閉空閒資源**:我們可以將節省的大部分費用歸功於我們在處理臨時工作負載方面的經驗,通過將幾乎所有的資料湖操作都構造為Airflow DAG,我們可以自動化何時刪除未使用的資源,我們只有在需要通過Airflow進行批量提取作業時才啟動DMS複製例項的能力,這使我們節省了始終保持例項可用的等效成本的90%以上。類似地,當我們的Airflow排程處理層作業時,我們僅使用競價型例項啟動EMR叢集,並在完成時終止叢集,這與我們始終擁有可用節點相比,平均節省了70%以上。
## 8. 下一步計劃
我們也在尋找改善資料基礎架構的方法,並且已經開始制定下一步計劃,包括:
* 通過諸如Apache Hudi或Delta Lake之類的技術改善資料可用性以及儲存層中的版本控制管理。
* 通過Apache Kafka和Debezium進行事件驅動開發來調整我們的攝入層功能。
* 使用AWS Lake Formation等工具改善資料訪問治理,這樣可以對團隊訪問哪些資料進行嚴格