美團點評基於 Flink 的實時數倉建設實踐
引言
近些年,企業對資料服務實時化服務的需求日益增多。本文整理了常見實時資料元件的效能特點和適用場景,介紹了美團如何通過 Flink 引擎構建實時資料倉庫,從而提供高效、穩健的實時資料服務。此前我們美團技術部落格釋出過一篇文章《流計算框架 Flink 與 Storm 的效能對比》,對 Flink 和 Storm 倆個引擎的計算效能進行了比較。本文主要闡述使用 Flink 在實際資料生產上的經驗。
實時平臺初期架構
在實時資料系統建設初期,由於對實時資料的需求較少,形成不了完整的資料體系。我們採用的是“一路到底”的開發模式:通過在實時計算平臺上部署 Storm 作業處理實時資料佇列來提取資料指標,直接推送到實時應用服務中。
但是,隨著產品和業務人員對實時資料需求的不斷增多,新的挑戰也隨之發生。
- 資料指標越來越多,“煙囪式”的開發導致程式碼耦合問題嚴重。
- 需求越來越多,有的需要明細資料,有的需要 OLAP 分析。單一的開發模式難以應付多種需求。
- 缺少完善的監控系統,無法在對業務產生影響之前發現並修復問題。
實時資料倉庫的構建
為解決以上問題,我們根據生產離線資料的經驗,選擇使用分層設計方案來建設實時資料倉庫,其分層架構如下圖所示:
該方案由以下四層構成:
- ODS 層:Binlog 和流量日誌以及各業務實時佇列。
- 資料明細層:業務領域整合提取事實資料,離線全量和實時變化資料構建實時維度資料。
- 資料彙總層:使用寬表模型對明細資料補充維度資料,對共性指標進行彙總。
- App 層:為了具體需求而構建的應用層,通過 RPC 框架對外提供服務。
通過多層設計我們可以將處理資料的流程沉澱在各層完成。比如在資料明細層統一完成資料的過濾、清洗、規範、脫敏流程;在資料彙總層加工共性的多維指標彙總資料。提高了程式碼的複用率和整體生產效率。同時各層級處理的任務型別相似,可以採用統一的技術方案優化效能,使數倉技術架構更簡潔。
技術選型
1.儲存引擎的調研
實時數倉在設計中不同於離線數倉在各層級使用同種儲存方案,比如都儲存在 Hive 、DB 中的策略。首先對中間過程的表,採用將結構化的資料通過訊息佇列儲存和高速 KV 儲存混合的方案。實時計算引擎可以通過監聽訊息消費訊息佇列內的資料,進行實時計算。而在高速 KV 儲存上的資料則可以用於快速關聯計算,比如維度資料。 其次在應用層上,針對資料使用特點配置儲存方案直接寫入。避免了離線數倉應用層同步資料流程帶來的處理延遲。 為了解決不同型別的實時資料需求,合理的設計各層級儲存方案,我們調研了美團內部使用比較廣泛的幾種儲存方案。
方案 | 優勢 | 劣勢 |
---|---|---|
MySQL | 1. 具有完備的事務功能,可以對資料進行更新。2. 支援 SQL,開發成本低。 | 1. 橫向擴充套件成本大,儲存容易成為瓶頸; 2. 實時資料的更新和查詢頻率都很高,線上單個實時應用請求就有 1000+ QPS;使用 MySQL 成本太高。 |
Elasticsearch | 1. 吞吐量大,單個機器可以支援 2500+ QPS,並且叢集可以快速橫向擴充套件。2. Term 查詢時響應速度很快,單個機器在 2000+ QPS時,查詢延遲在 20 ms以內。 | 1. 沒有原生的 SQL 支援,查詢 DSL 有一定的學習門檻;2. 進行聚合運算時效能下降明顯。 |
Druid | 1. 支援超大資料量,通過 Kafka 獲取實時資料時,單個作業可支援 6W+ QPS;2. 可以在資料匯入時通過預計算對資料進行彙總,減少的資料儲存。提高了實際處理資料的效率;3. 有很多開源 OLAP 分析框架。實現如 Superset。 | 1. 預聚合導致無法支援明細的查詢;2. 無法支援 Join 操作;3. Append-only 不支援資料的修改。只能以 Segment 為單位進行替換。 |
Cellar | 1. 支援超大資料量,採用記憶體加分散式儲存的架構,儲存價效比很高;2. 吞吐效能好,經測試處理 3W+ QPS 讀寫請求時,平均延遲在 1ms左右;通過非同步讀寫線上最高支援 10W+ QPS。 | 1. 介面僅支援 KV,Map,List 以及原子加減等;2. 單個 Key 值不得超過 1KB ,而 Value 的值超過 100KB 時則效能下降明顯。 |
根據不同業務場景,實時數倉各個模型層次使用的儲存方案大致如下:
- 資料明細層 對於維度資料部分場景下關聯的頻率可達 10w+ TPS,我們選擇 Cellar(美團內部儲存系統) 作為儲存,封裝維度服務為實時數倉提供維度資料。
- 資料彙總層 對於通用的彙總指標,需要進行歷史資料關聯的資料,採用和維度資料一樣的方案通過 Cellar 作為儲存,用服務的方式進行關聯操作。
- 資料應用層 應用層設計相對複雜,再對比了幾種不同儲存方案後。我們制定了以資料讀寫頻率 1000 QPS 為分界的判斷依據。對於讀寫平均頻率高於 1000 QPS 但查詢不太複雜的實時應用,比如商戶實時的經營資料。採用 Cellar 為儲存,提供實時資料服務。對於一些查詢複雜的和需要明細列表的應用,使用 Elasticsearch 作為儲存則更為合適。而一些查詢頻率低,比如一些內部運營的資料。 Druid 通過實時處理訊息構建索引,並通過預聚合可以快速的提供實時資料 OLAP 分析功能。對於一些歷史版本的資料產品進行實時化改造時,也可以使用 MySQL 儲存便於產品迭代。
2.計算引擎的調研
在實時平臺建設初期我們使用 Storm 引擎來進行實時資料處理。Storm 引擎雖然在靈活性和效能上都表現不錯。但是由於 API 過於底層,在資料開發過程中需要對一些常用的資料操作進行功能實現。比如表關聯、聚合等,產生了很多額外的開發工作,不僅引入了很多外部依賴比如快取,而且實際使用時效能也不是很理想。同時 Storm 內的資料物件 Tuple 支援的功能也很簡單,通常需要將其轉換為 Java 物件來處理。對於這種基於程式碼定義的資料模型,通常我們只能通過文件來進行維護。不僅需要額外的維護工作,同時在增改欄位時也很麻煩。綜合來看使用 Storm 引擎構建實時數倉難度較大。我們需要一個新的實時處理方案,要能夠實現:
- 提供高階 API,支援常見的資料操作比如關聯聚合,最好是能支援 SQL。
- 具有狀態管理和自動支援久化方案,減少對儲存的依賴。
- 便於接入元資料服務,避免通過程式碼管理資料結構。
- 處理效能至少要和 Storm 一致。
我們對主要的實時計算引擎進行了技術調研。總結了各類引擎特性如下表所示:
專案/引擎 | Storm | Flink | spark-treaming |
---|---|---|---|
API | 靈活的底層 API 和具有事務保證的 Trident API | 流 API 和更加適合資料開發的 Table API 和 Flink SQL 支援 | 流 API 和 Structured-Streaming API 同時也可以使用更適合資料開發的 Spark SQL |
容錯機制 | ACK 機制 | State 分散式快照儲存點 | RDD 儲存點 |
狀態管理 | Trident State狀態管理 | Key State 和 Operator State兩種 State 可以使用,支援多種持久化方案 | 有 UpdateStateByKey 等 API 進行帶狀態的變更,支援多種持久化方案 |
處理模式 | 單條流式處理 | 單條流式處理 | Mic batch處理 |
延遲 | 毫秒級 | 毫秒級 | 秒級 |
語義保障 | At Least Once,Exactly Once | Exactly Once,At Least Once | At Least Once |
從調研結果來看,Flink 和 Spark Streaming 的 API 、容錯機制與狀態持久化機制都可以解決一部分我們目前使用 Storm 中遇到的問題。但 Flink 在資料延遲上和 Storm 更接近,對現有應用影響最小。而且在公司內部的測試中 Flink 的吞吐效能對比 Storm 有十倍左右提升。綜合考量我們選定 Flink 引擎作為實時數倉的開發引擎。
更加引起我們注意的是,Flink 的 Table 抽象和 SQL 支援。雖然使用 Strom 引擎也可以處理結構化資料。但畢竟依舊是基於訊息的處理 API ,在程式碼層層面上不能完全享受操作結構化資料的便利。而 Flink 不僅支援了大量常用的 SQL 語句,基本覆蓋了我們的開發場景。而且 Flink 的 Table 可以通過 TableSchema 進行管理,支援豐富的資料型別和資料結構以及資料來源。可以很容易的和現有的元資料管理系統或配置管理系統結合。通過下圖我們可以清晰的看出 Storm 和 Flink 在開發統過程中的區別。
在使用 Storm 開發時處理邏輯與實現需要固化在 Bolt 的程式碼。Flink 則可以通過 SQL 進行開發,程式碼可讀性更高,邏輯的實現由開源框架來保證可靠高效,對特定場景的優化只要修改 Flink SQL 優化器功能實現即可,而不影響邏輯程式碼。使我們可以把更多的精力放到到資料開發中,而不是邏輯的實現。當需要離線資料和實時資料口徑統一的場景時,我們只需對離線口徑的 SQL 指令碼稍加改造即可,極大地提高了開發效率。同時對比圖中 Flink 和 Storm 使用的資料模型,Storm 需要通過一個 Java 的 Class 去定義資料結構,Flink Table 則可以通過元資料來定義。可以很好的和資料開發中的元資料,資料治理等系統結合,提高開發效率。
Flink使用心得
在利用 Flink-Table 構建實時資料倉庫過程中。我們針對一些構建資料倉庫的常用操作,比如資料指標的維度擴充,資料按主題關聯,以及資料的聚合運算通過 Flink 來實現總結了一些使用心得。
1.維度擴充
資料指標的維度擴充,我們採用的是通過維度服務獲取維度資訊。雖然基於 Cellar 的維度服務通常的響應延遲可以在 1ms 以下。但是為了進一步優化 Flink 的吞吐,我們對維度資料的關聯全部採用了非同步介面訪問的方式,避免了使用 RPC 呼叫影響資料吞吐。 對於一些資料量很大的流,比如流量日誌資料量在 10W 條/秒這個量級。在關聯 UDF 的時候內建了快取機制,可以根據命中率和時間對快取進行淘汰,配合用關聯的 Key 值進行分割槽,顯著減少了對外部服務的請求次數,有效的減少了處理延遲和對外部系統的壓力。
2.資料關聯
資料主題合併,本質上就是多個數據源的關聯,簡單的來說就是 Join 操作。Flink 的 Table 是建立在無限流這個概念上的。在進行 Join 操作時並不能像離線資料一樣對兩個完整的表進行關聯。採用的是在視窗時間內對資料進行關聯的方案,相當於從兩個資料流中各自擷取一段時間的資料進行 Join 操作。有點類似於離線資料通過限制分割槽來進行關聯。同時需要注意 Flink 關聯表時必須有至少一個“等於”關聯條件,因為等號兩邊的值會用來分組。 由於 Flink 會快取視窗內的全部資料來進行關聯,快取的資料量和關聯的視窗大小成正比。因此 Flink 的關聯查詢,更適合處理一些可以通過業務規則限制關聯資料時間範圍的場景。比如關聯下單使用者購買之前 30 分鐘內的瀏覽日誌。過大的視窗不僅會消耗更多的記憶體,同時會產生更大的 Checkpoint ,導致吞吐下降或 Checkpoint 超時。在實際生產中可以使用 RocksDB 和啟用增量儲存點模式,減少 Checkpoint 過程對吞吐產生影響。對於一些需要關聯視窗期很長的場景,比如關聯的資料可能是幾天以前的資料。對於這些歷史資料,我們可以將其理解為是一種已經固定不變的"維度"。可以將需要被關聯的歷史資料採用和維度資料一致的處理方法:"快取 + 離線"資料方式儲存,用介面的方式進行關聯。另外需要注意 Flink 對多表關聯是直接順序連結的,因此需要注意先進行結果集小的關聯。
3.聚合運算
使用聚合運算時,Flink 對常見的聚合運算如求和、極值、均值等都有支援。美中不足的是對於 Distinct 的支援,Flink-1.6 之前的採用的方案是通過先對去重欄位進行分組再聚合實現。對於需要對多個欄位去重聚合的場景,只能分別計算再進行關聯處理效率很低。為此我們開發了自定義的 UDAF,實現了 MapView 精確去重、BloomFilter 非精確去重、 HyperLogLog 超低記憶體去重方案應對各種實時去重場景。但是在使用自定義的 UDAF 時,需要注意 RocksDBStateBackend 模式對於較大的 Key 進行更新操作時序列化和反序列化耗時很多。可以考慮使用 FsStateBackend 模式替代。另外要注意的一點 Flink 框架在計算比如 Rank 這樣的分析函式時,需要快取每個分組視窗下的全部資料才能進行排序,會消耗大量記憶體。建議在這種場景下優先轉換為 TopN 的邏輯,看是否可以解決需求。
下圖展示一個完整的使用 Flink 引擎生產一張實時資料表的過程:
實時數倉成果
通過使用實時數倉代替原有流程,我們將資料生產中的各個流程抽象到實時數倉的各層當中。實現了全部實時資料應用的資料來源統一,保證了應用資料指標、維度的口徑的一致。在幾次資料口徑發生修改的場景中,我們通過對倉庫明細和彙總進行改造,在完全不用修改應用程式碼的情況下就完成全部應用的口徑切換。在開發過程中通過嚴格的把控資料分層、主題域劃分、內容組織標準規範和命名規則。使資料開發的鏈路更為清晰,減少了程式碼的耦合。再配合上使用 Flink SQL 進行開發,程式碼加簡潔。單個作業的程式碼量從平均 300+ 行的 JAVA 程式碼 ,縮減到幾十行的 SQL 指令碼。專案的開發時長也大幅減短,一人日開發多個實時資料指標情況也不少見。
除此以外我們通過針對數倉各層級工作內容的不同特點,可以進行鍼對性的效能優化和引數配置。比如 ODS 層主要進行資料的解析、過濾等操作,不需要 RPC 呼叫和聚合運算。 我們針對資料解析過程進行優化,減少不必要的 JSON 欄位解析,並使用更高效的 JSON 包。在資源分配上,單個 CPU 只配置 1GB 的記憶體即可滿需求。而彙總層主要則主要進行聚合與關聯運算,可以通過優化聚合演算法、內外存共同運算來提高效能、減少成本。資源配置上也會分配更多的記憶體,避免記憶體溢位。通過這些優化手段,雖然相比原有流程實時數倉的生產鏈路更長,但資料延遲並沒有明顯增加。同時實時資料應用所使用的計算資源也有明顯減少。
展望
我們的目標是將實時倉庫建設成可以和離線倉庫資料準確性,一致性媲美的資料系統。為商家,業務人員以及美團使用者提供及時可靠的資料服務。同時作為到餐實時資料的統一出口,為集團其他業務部門助力。未來我們將更加關注在資料可靠性和實時資料指標管理。建立完善的資料監控,資料血緣檢測,交叉檢查機制。及時對異常資料或資料延遲進行監控和預警。同時優化開發流程,降低開發實時資料學習成本。讓更多有實時資料需求的人,可以自己動手解決問題。
參考文獻
關於作者
偉倫,美團到店餐飲技術部實時資料負責人,2017年加入美團,長期從事資料平臺、實時資料計算、資料架構方面的開發工作。在使用 Flink 進行實時資料生產和提高生產效率上,有一些心得和產出。同時也積極推廣 Flink 在實時資料處理中的實戰經驗。
招聘資訊
對資料工程和將資料通過服務業務釋放價值感興趣的同學,可以傳送簡歷到 [email protected]。我們在實時資料倉庫、實時資料治理、實時資料產品開發框架、面向銷售和商家側的資料型創新產品層面,都有很多未知但有意義的領域等你來開拓。