1. 程式人生 > >美團點評基於 Flink 的實時數倉建設實踐

美團點評基於 Flink 的實時數倉建設實踐

引言

近些年,企業對資料服務實時化服務的需求日益增多。本文整理了常見實時資料元件的效能特點和適用場景,介紹了美團如何通過 Flink 引擎構建實時資料倉庫,從而提供高效、穩健的實時資料服務。此前我們美團技術部落格釋出過一篇文章《流計算框架 Flink 與 Storm 的效能對比》,對 Flink 和 Storm 倆個引擎的計算效能進行了比較。本文主要闡述使用 Flink 在實際資料生產上的經驗。

實時平臺初期架構

在實時資料系統建設初期,由於對實時資料的需求較少,形成不了完整的資料體系。我們採用的是“一路到底”的開發模式:通過在實時計算平臺上部署 Storm 作業處理實時資料佇列來提取資料指標,直接推送到實時應用服務中。

圖1 初期實時資料架構

但是,隨著產品和業務人員對實時資料需求的不斷增多,新的挑戰也隨之發生。

  1. 資料指標越來越多,“煙囪式”的開發導致程式碼耦合問題嚴重。
  2. 需求越來越多,有的需要明細資料,有的需要 OLAP 分析。單一的開發模式難以應付多種需求。
  3. 缺少完善的監控系統,無法在對業務產生影響之前發現並修復問題。

實時資料倉庫的構建

為解決以上問題,我們根據生產離線資料的經驗,選擇使用分層設計方案來建設實時資料倉庫,其分層架構如下圖所示:

圖2 實時數倉資料分層架構

該方案由以下四層構成:

  1. ODS 層:Binlog 和流量日誌以及各業務實時佇列。
  2. 資料明細層:業務領域整合提取事實資料,離線全量和實時變化資料構建實時維度資料。
  3. 資料彙總層:使用寬表模型對明細資料補充維度資料,對共性指標進行彙總。
  4. App 層:為了具體需求而構建的應用層,通過 RPC 框架對外提供服務。

通過多層設計我們可以將處理資料的流程沉澱在各層完成。比如在資料明細層統一完成資料的過濾、清洗、規範、脫敏流程;在資料彙總層加工共性的多維指標彙總資料。提高了程式碼的複用率和整體生產效率。同時各層級處理的任務型別相似,可以採用統一的技術方案優化效能,使數倉技術架構更簡潔。

技術選型

1.儲存引擎的調研

實時數倉在設計中不同於離線數倉在各層級使用同種儲存方案,比如都儲存在 Hive 、DB 中的策略。首先對中間過程的表,採用將結構化的資料通過訊息佇列儲存和高速 KV 儲存混合的方案。實時計算引擎可以通過監聽訊息消費訊息佇列內的資料,進行實時計算。而在高速 KV 儲存上的資料則可以用於快速關聯計算,比如維度資料。 其次在應用層上,針對資料使用特點配置儲存方案直接寫入。避免了離線數倉應用層同步資料流程帶來的處理延遲。 為了解決不同型別的實時資料需求,合理的設計各層級儲存方案,我們調研了美團內部使用比較廣泛的幾種儲存方案。

表1 儲存方案列表
方案 優勢 劣勢
MySQL 1. 具有完備的事務功能,可以對資料進行更新。2. 支援 SQL,開發成本低。 1. 橫向擴充套件成本大,儲存容易成為瓶頸; 2. 實時資料的更新和查詢頻率都很高,線上單個實時應用請求就有 1000+ QPS;使用 MySQL 成本太高。
Elasticsearch 1. 吞吐量大,單個機器可以支援 2500+ QPS,並且叢集可以快速橫向擴充套件。2. Term 查詢時響應速度很快,單個機器在 2000+ QPS時,查詢延遲在 20 ms以內。 1. 沒有原生的 SQL 支援,查詢 DSL 有一定的學習門檻;2. 進行聚合運算時效能下降明顯。
Druid 1. 支援超大資料量,通過 Kafka 獲取實時資料時,單個作業可支援 6W+ QPS;2. 可以在資料匯入時通過預計算對資料進行彙總,減少的資料儲存。提高了實際處理資料的效率;3. 有很多開源 OLAP 分析框架。實現如 Superset。 1. 預聚合導致無法支援明細的查詢;2. 無法支援 Join 操作;3. Append-only 不支援資料的修改。只能以 Segment 為單位進行替換。
Cellar 1. 支援超大資料量,採用記憶體加分散式儲存的架構,儲存價效比很高;2. 吞吐效能好,經測試處理 3W+ QPS 讀寫請求時,平均延遲在 1ms左右;通過非同步讀寫線上最高支援 10W+ QPS。 1. 介面僅支援 KV,Map,List 以及原子加減等;2. 單個 Key 值不得超過 1KB ,而 Value 的值超過 100KB 時則效能下降明顯。

根據不同業務場景,實時數倉各個模型層次使用的儲存方案大致如下:

圖3 實時數倉儲存分層架構
  1. 資料明細層 對於維度資料部分場景下關聯的頻率可達 10w+ TPS,我們選擇 Cellar(美團內部儲存系統) 作為儲存,封裝維度服務為實時數倉提供維度資料。
  2. 資料彙總層 對於通用的彙總指標,需要進行歷史資料關聯的資料,採用和維度資料一樣的方案通過 Cellar 作為儲存,用服務的方式進行關聯操作。
  3. 資料應用層 應用層設計相對複雜,再對比了幾種不同儲存方案後。我們制定了以資料讀寫頻率 1000 QPS 為分界的判斷依據。對於讀寫平均頻率高於 1000 QPS 但查詢不太複雜的實時應用,比如商戶實時的經營資料。採用 Cellar 為儲存,提供實時資料服務。對於一些查詢複雜的和需要明細列表的應用,使用 Elasticsearch 作為儲存則更為合適。而一些查詢頻率低,比如一些內部運營的資料。 Druid 通過實時處理訊息構建索引,並通過預聚合可以快速的提供實時資料 OLAP 分析功能。對於一些歷史版本的資料產品進行實時化改造時,也可以使用 MySQL 儲存便於產品迭代。

2.計算引擎的調研

在實時平臺建設初期我們使用 Storm 引擎來進行實時資料處理。Storm 引擎雖然在靈活性和效能上都表現不錯。但是由於 API 過於底層,在資料開發過程中需要對一些常用的資料操作進行功能實現。比如表關聯、聚合等,產生了很多額外的開發工作,不僅引入了很多外部依賴比如快取,而且實際使用時效能也不是很理想。同時 Storm 內的資料物件 Tuple 支援的功能也很簡單,通常需要將其轉換為 Java 物件來處理。對於這種基於程式碼定義的資料模型,通常我們只能通過文件來進行維護。不僅需要額外的維護工作,同時在增改欄位時也很麻煩。綜合來看使用 Storm 引擎構建實時數倉難度較大。我們需要一個新的實時處理方案,要能夠實現:

  1. 提供高階 API,支援常見的資料操作比如關聯聚合,最好是能支援 SQL。
  2. 具有狀態管理和自動支援久化方案,減少對儲存的依賴。
  3. 便於接入元資料服務,避免通過程式碼管理資料結構。
  4. 處理效能至少要和 Storm 一致。

我們對主要的實時計算引擎進行了技術調研。總結了各類引擎特性如下表所示:

表2 實時計算方案列表
專案/引擎 Storm Flink spark-treaming
API 靈活的底層 API 和具有事務保證的 Trident API 流 API 和更加適合資料開發的 Table API 和 Flink SQL 支援 流 API 和 Structured-Streaming API 同時也可以使用更適合資料開發的 Spark SQL
容錯機制 ACK 機制 State 分散式快照儲存點 RDD 儲存點
狀態管理 Trident State狀態管理 Key State 和 Operator State兩種 State 可以使用,支援多種持久化方案 有 UpdateStateByKey 等 API 進行帶狀態的變更,支援多種持久化方案
處理模式 單條流式處理 單條流式處理 Mic batch處理
延遲 毫秒級 毫秒級 秒級
語義保障 At Least Once,Exactly Once Exactly Once,At Least Once At Least Once

從調研結果來看,Flink 和 Spark Streaming 的 API 、容錯機制與狀態持久化機制都可以解決一部分我們目前使用 Storm 中遇到的問題。但 Flink 在資料延遲上和 Storm 更接近,對現有應用影響最小。而且在公司內部的測試中 Flink 的吞吐效能對比 Storm 有十倍左右提升。綜合考量我們選定 Flink 引擎作為實時數倉的開發引擎。

更加引起我們注意的是,Flink 的 Table 抽象和 SQL 支援。雖然使用 Strom 引擎也可以處理結構化資料。但畢竟依舊是基於訊息的處理 API ,在程式碼層層面上不能完全享受操作結構化資料的便利。而 Flink 不僅支援了大量常用的 SQL 語句,基本覆蓋了我們的開發場景。而且 Flink 的 Table 可以通過 TableSchema 進行管理,支援豐富的資料型別和資料結構以及資料來源。可以很容易的和現有的元資料管理系統或配置管理系統結合。通過下圖我們可以清晰的看出 Storm 和 Flink 在開發統過程中的區別。

圖4 Flink - Storm 對比圖

在使用 Storm 開發時處理邏輯與實現需要固化在 Bolt 的程式碼。Flink 則可以通過 SQL 進行開發,程式碼可讀性更高,邏輯的實現由開源框架來保證可靠高效,對特定場景的優化只要修改 Flink SQL 優化器功能實現即可,而不影響邏輯程式碼。使我們可以把更多的精力放到到資料開發中,而不是邏輯的實現。當需要離線資料和實時資料口徑統一的場景時,我們只需對離線口徑的 SQL 指令碼稍加改造即可,極大地提高了開發效率。同時對比圖中 Flink 和 Storm 使用的資料模型,Storm 需要通過一個 Java 的 Class 去定義資料結構,Flink Table 則可以通過元資料來定義。可以很好的和資料開發中的元資料,資料治理等系統結合,提高開發效率。

Flink使用心得

在利用 Flink-Table 構建實時資料倉庫過程中。我們針對一些構建資料倉庫的常用操作,比如資料指標的維度擴充,資料按主題關聯,以及資料的聚合運算通過 Flink 來實現總結了一些使用心得。

1.維度擴充

資料指標的維度擴充,我們採用的是通過維度服務獲取維度資訊。雖然基於 Cellar 的維度服務通常的響應延遲可以在 1ms 以下。但是為了進一步優化 Flink 的吞吐,我們對維度資料的關聯全部採用了非同步介面訪問的方式,避免了使用 RPC 呼叫影響資料吞吐。 對於一些資料量很大的流,比如流量日誌資料量在 10W 條/秒這個量級。在關聯 UDF 的時候內建了快取機制,可以根據命中率和時間對快取進行淘汰,配合用關聯的 Key 值進行分割槽,顯著減少了對外部服務的請求次數,有效的減少了處理延遲和對外部系統的壓力。

2.資料關聯

資料主題合併,本質上就是多個數據源的關聯,簡單的來說就是 Join 操作。Flink 的 Table 是建立在無限流這個概念上的。在進行 Join 操作時並不能像離線資料一樣對兩個完整的表進行關聯。採用的是在視窗時間內對資料進行關聯的方案,相當於從兩個資料流中各自擷取一段時間的資料進行 Join 操作。有點類似於離線資料通過限制分割槽來進行關聯。同時需要注意 Flink 關聯表時必須有至少一個“等於”關聯條件,因為等號兩邊的值會用來分組。 由於 Flink 會快取視窗內的全部資料來進行關聯,快取的資料量和關聯的視窗大小成正比。因此 Flink 的關聯查詢,更適合處理一些可以通過業務規則限制關聯資料時間範圍的場景。比如關聯下單使用者購買之前 30 分鐘內的瀏覽日誌。過大的視窗不僅會消耗更多的記憶體,同時會產生更大的 Checkpoint ,導致吞吐下降或 Checkpoint 超時。在實際生產中可以使用 RocksDB 和啟用增量儲存點模式,減少 Checkpoint 過程對吞吐產生影響。對於一些需要關聯視窗期很長的場景,比如關聯的資料可能是幾天以前的資料。對於這些歷史資料,我們可以將其理解為是一種已經固定不變的"維度"。可以將需要被關聯的歷史資料採用和維度資料一致的處理方法:"快取 + 離線"資料方式儲存,用介面的方式進行關聯。另外需要注意 Flink 對多表關聯是直接順序連結的,因此需要注意先進行結果集小的關聯。

3.聚合運算

使用聚合運算時,Flink 對常見的聚合運算如求和、極值、均值等都有支援。美中不足的是對於 Distinct 的支援,Flink-1.6 之前的採用的方案是通過先對去重欄位進行分組再聚合實現。對於需要對多個欄位去重聚合的場景,只能分別計算再進行關聯處理效率很低。為此我們開發了自定義的 UDAF,實現了 MapView 精確去重、BloomFilter 非精確去重、 HyperLogLog 超低記憶體去重方案應對各種實時去重場景。但是在使用自定義的 UDAF 時,需要注意 RocksDBStateBackend 模式對於較大的 Key 進行更新操作時序列化和反序列化耗時很多。可以考慮使用 FsStateBackend 模式替代。另外要注意的一點 Flink 框架在計算比如 Rank 這樣的分析函式時,需要快取每個分組視窗下的全部資料才能進行排序,會消耗大量記憶體。建議在這種場景下優先轉換為 TopN 的邏輯,看是否可以解決需求。

下圖展示一個完整的使用 Flink 引擎生產一張實時資料表的過程:

圖5 實時計算流程圖

實時數倉成果

通過使用實時數倉代替原有流程,我們將資料生產中的各個流程抽象到實時數倉的各層當中。實現了全部實時資料應用的資料來源統一,保證了應用資料指標、維度的口徑的一致。在幾次資料口徑發生修改的場景中,我們通過對倉庫明細和彙總進行改造,在完全不用修改應用程式碼的情況下就完成全部應用的口徑切換。在開發過程中通過嚴格的把控資料分層、主題域劃分、內容組織標準規範和命名規則。使資料開發的鏈路更為清晰,減少了程式碼的耦合。再配合上使用 Flink SQL 進行開發,程式碼加簡潔。單個作業的程式碼量從平均 300+ 行的 JAVA 程式碼 ,縮減到幾十行的 SQL 指令碼。專案的開發時長也大幅減短,一人日開發多個實時資料指標情況也不少見。

除此以外我們通過針對數倉各層級工作內容的不同特點,可以進行鍼對性的效能優化和引數配置。比如 ODS 層主要進行資料的解析、過濾等操作,不需要 RPC 呼叫和聚合運算。 我們針對資料解析過程進行優化,減少不必要的 JSON 欄位解析,並使用更高效的 JSON 包。在資源分配上,單個 CPU 只配置 1GB 的記憶體即可滿需求。而彙總層主要則主要進行聚合與關聯運算,可以通過優化聚合演算法、內外存共同運算來提高效能、減少成本。資源配置上也會分配更多的記憶體,避免記憶體溢位。通過這些優化手段,雖然相比原有流程實時數倉的生產鏈路更長,但資料延遲並沒有明顯增加。同時實時資料應用所使用的計算資源也有明顯減少。

展望

我們的目標是將實時倉庫建設成可以和離線倉庫資料準確性,一致性媲美的資料系統。為商家,業務人員以及美團使用者提供及時可靠的資料服務。同時作為到餐實時資料的統一出口,為集團其他業務部門助力。未來我們將更加關注在資料可靠性和實時資料指標管理。建立完善的資料監控,資料血緣檢測,交叉檢查機制。及時對異常資料或資料延遲進行監控和預警。同時優化開發流程,降低開發實時資料學習成本。讓更多有實時資料需求的人,可以自己動手解決問題。

參考文獻

流計算框架 Flink 與 Storm 的效能對比

關於作者

偉倫,美團到店餐飲技術部實時資料負責人,2017年加入美團,長期從事資料平臺、實時資料計算、資料架構方面的開發工作。在使用 Flink 進行實時資料生產和提高生產效率上,有一些心得和產出。同時也積極推廣 Flink 在實時資料處理中的實戰經驗。

招聘資訊

對資料工程和將資料通過服務業務釋放價值感興趣的同學,可以傳送簡歷到 [email protected]。我們在實時資料倉庫、實時資料治理、實時資料產品開發框架、面向銷售和商家側的資料型創新產品層面,都有很多未知但有意義的領域等你來開拓。