1. 程式人生 > 實用技巧 >Kylin Flink Cube 引擎的前世今生

Kylin Flink Cube 引擎的前世今生

Apache Kylin™ 是一個開源的、分散式的分析型資料倉庫,提供 Hadoop/Spark 之上的 SQL 查詢介面及多維分析(OLAP)能力以支援超大規模資料,它能在亞秒內查詢巨大的表。

Kylin 的核心思想是”預計算“,將資料按照指定的維度和指標,預先計算出所有可能的查詢結果,利用空間換時間來加速模式固定的 OLAP 查詢。

Kylin 的理論基礎是 Cube 理論,每一種維度組合稱之為 Cuboid,所有 Cuboid 的集合稱之為 Cube。如下圖,整個立方體稱為 1 個 Cube,立方體中每個網格點稱為 1 個 Cuboid,圖中 (A, B, C, D) 和 (A, D) 都是 Cuboid,其中 (A, B, C, D) 稱為 Base Cuboid。

基於預計算的思想,Cuboid 需要提前算好並存儲起來。由於 Kylin 的場景是面向海量規模的大資料分析,所以 Cube 的構建利用了大資料的計算框架,我們常將計算框架構建 Cube 的實現稱之為“Cube 引擎“。在過去的很長一段時間,構建 Cube 時所能選擇的引擎只能是 Spark 或者 Hadoop 的 MapReduce 框架。但隨著 Kylin 3.1 版本的釋出,我們將看到另一個 Cube 引擎正式加入到 Kylin 生態中:Kylin Flink Cube 引擎。在下文中,我們就來對 Flink 的 Cube 引擎做一個全面的介紹。

Flink Cube 引擎簡介

2018 年底,我萌生了給 Kylin 寫一個以 Flink 計算框架來作為 Cube 引擎的想法,當時我還在騰訊,主要從事 Flink 框架的研發。Flink 框架在當時已經不是大資料領域的一顆新星了,它基於 Google 的 DataFlow 模型以及 Streaming First 的設計理念要比 Spark 在流處理領域擁有先天的優勢,而且已經被國內外眾多公司所廣泛使用。在釋放計算能力方面,Spark 和 Flink 提供了相似的功能,都是大資料領域支援流和批的通用型計算引擎。既然 Spark 能作為 Kylin 的 Cube 引擎,那麼 Flink 理論上沒有理由不可以。

想要實現 Flink 的 Cube 引擎有兩個初衷:

  1. 擴大 Flink 的生態;
  2. 滿足騰訊內部統一流批計算引擎的需求(因為 Flink 是當時騰訊內部主推的流計算平臺)。

當我跟 Kylin PMC 史少鋒交流並提出這一想法後,他對此表示非常歡迎,這裡我們必須要稱讚一下 Kylin 社群對於接受新技術所持有的積極、開放的心態。

Flink Cube 引擎的開發就從 2019 年 1 月開始了,對我而言這是一個跨領域的過程,我需要從頭瞭解 Kylin 以及 OLAP 領域的一些核心思想和概念(畢竟之前一直在做計算框架)以及 Kylin 和 Spark Cube 引擎的一些關鍵設計。

前後加起來差不多利用了數月的業餘時間實現完成了整個實現。其中經歷了數次調優,這裡需要特別感謝 Kyligence 多位童鞋耐心、仔細地進行對比測試(尤其是 Kylin PMC 倪春恩和 Kyligence 的張亞倩童鞋),終於這個引擎的效能到了能夠跟 Spark 相提並論的地步。

隨後,我們在 6,7 月份開始在騰訊內部試點該引擎來構建 Cube,以支援 QQ 音樂、廣點通等業務的分析需求。經過內部的試執行,我們觀察到整體上它的效能表現要優於 Spark 的實現。然後,在 19 年 9 月的 Kylin 深圳 Meetup 上,我跟前同事程廣旭共同分享了一個 Talk 介紹了 Kylin 在騰訊的落地實踐以及 Flink Cube 引擎。[2]

跟 Kylin 的很多其他 Feature 一樣,Flink Engine 最初也是在一個獨立的分支上開發的,這個分支就是 engine-flink,2019 年底 Kylin 社群經過測試將該分支合併到了 master 分支。Flink Engine 最初使用的 Flink 版本是 1.7.2,後面升級到了 1.9.0。整個 Cube 引擎在 Jira 上有一個 Umbrella issue,編號是 KYLIN-3758[3],所有的子任務都在這個 issue 下面。

Flink Cube 引擎基於 Kylin 原先的外掛化的架構,繼承 IBatchCubingEngine 介面實現了 FlinkBatchCubingEngine2,是一個相對獨立的模組,跟 Kylin 其他部件沒有產生太多的耦合。它整體上延續了 Spark Cube 引擎的設計與實現,由於 Spark 跟 Flink 的 DataSet API 存在著一定程度的差異,所以在開發時需要進行一些適配工作。這裡我們先介紹一下 Spark Cube 引擎的核心:”By layer“ 演算法。Kylin 官方曾經出了一篇部落格介紹 Spark Cube 引擎以及該演算法的實現[1],以下是這篇部落格裡的一段文字摘錄:

The “by-layer” Cubing divides a big task into a couple steps, and each step bases on the previous step’s output, so it can reuse the previous calculation and also avoid calculating from very beginning when there is a failure in between. These makes it as a reliable algorithm. When moving to Spark, we decide to keep this algorithm, that’s why we call this feature as “By layer Spark Cubing”.

簡而言之,"By layer" 演算法的核心思想是逐層計算 Cube ,首先計算 Base Cuboid,然後計算維度數依次減少,逐層向下計算每層的 Cuboid。

在實現時,Cube 的構建流程,包含若干個步驟。選擇特定的構建引擎通常會使用相應的計算框架提供的 API 去實現這些步驟。由於這些步驟都是一個個獨立的 YARN application,所以,也並不是一個 segment 構建任務裡所有的子任務都一定要由同一個構建引擎的 API 來實現

接下來,介紹一下我們如何選擇 Flink Cube 引擎來構建。我們在 Kylin 的 Web UI 上提供了 Flink Cube Engine 的選項,當用戶編輯一個 Cube 資訊時,可以在第五步 (Advanced Setting) 中的 Cube Engine 下拉選項中選擇 “Flink”。

Cube 構建的若干步驟中,當屬 ”Cuboid build“ 步驟最為耗時也最為關鍵。下面我們就來介紹一下,Flink Cube 引擎在對 ”Cuboid build" 步驟調優時有哪些考慮。

Flink Cube 引擎調優

其實,在最初進行對比測試時,Flink 引擎要比 Spark 引擎慢不少。我們發現效能問題後首先對 Flink 框架的引數進行了調優。這裡除了記憶體外,有三個核心引數,分別是並行度、單個 TM Slot 的數目、TM Container 的數目。他們之間的關係是:TM Container 的數目 = 並行度 / 單個 TM Slot 的數目。我們基於控制變數法(固定住並行度以及 Job 總記憶體不變)嘗試調整出一個 Container 數與單 TM Slot 數效能最好的配比。結果得出的結論是,單個 TM 的 Slot 數目減少(當然單個 TM 的記憶體也會降低),拉起更多的 Container 數目的這種攤平的方式效能會更好。除此之外,在 Flink 批處理模組,它有幾個優化配置項,包括物件複用,記憶體預分配等,通過對比測試,所起到的效果並不明顯。當然,僅僅對 Flink 框架的引數進行調優,並沒有使得 Flink Cube 引擎的效能趕上 Spark Cube 引擎。接下來的一步優化也很關鍵:那就是合併/批量計算。通過分析 Flink Job 執行後的 ArchivedExecutionGraph,發現每一步都比較慢,且隨著 Layer 的變化沒有發生效能急劇下降的情況,基本維持了線性關係。於是,我認為問題應該還是在程式碼實現上,並不能完全照著 Spark 的方式來。所以,通過對整個 DAG 的重新分析,最終確認了效能瓶頸在於用於聚合 Cuboid 的 Reduce 運算元以及對 Cuboid 進行 Encode 的 Map 運算元上。

對於這兩個運算元,Flink 提供了相應的分組、分割槽的批量處理模式來提升整體處理的吞吐量,它們分別是 mapPartition/reduceGroup。它們會對上游的資料進行聚集,直到某分割槽的輸入全部被接收,然後才會呼叫具體的 UDF 對資料進行迭代處理。當然,這兩個運算元也有它危險的地方,那就是它們很佔用記憶體,資料量太大會存在記憶體耗盡從而導致 OOM 的風險。這一點,Flink 徹底的記憶體管理以及自定義型別系統的做法會有一些優勢,它能夠容納更多的資料在記憶體中,並且有效地減少 GC 的頻次,但仍然可能存在風險。所以,這一塊的改進建議是引入一個場景化的開關:如果記憶體資源充足那麼我們就可以儘量用這兩個運算元來降低構建時間,如果記憶體資源有限,那麼我們可以選擇更穩定的方式來構建。

Flink Cube & Spark Cube 引擎對比測試

  • 對比步驟:兩個計算引擎分別構建 Cuboid 資料;
  • YARN 叢集資源:4 個物理節點,每個物理節點 32 Core,125G 記憶體;
  • 資料來源:基於 SSB 資料集,事實表包含 6 千萬記錄。

結果如下:

結論:Flink Engine在對比測試中已超過 2 mins + 的明顯優勢勝出。

*注:整個測試過程由 Kylin Committer 倪春恩實施並提供測試結果。

構建過程中的相關 UI 的截圖如下:

兩個 Engine 相關的配置資訊如下:

Spark Cube Engine:

kylin.engine.spark-conf.spark.master=yarn
kylin.engine.spark-conf.spark.submit.deployMode=cluster
kylin.engine.spark-conf.spark.dynamicAllocation.enabled=true
kylin.engine.spark-conf.spark.dynamicAllocation.minExecutors=1
kylin.engine.spark-conf.spark.dynamicAllocation.maxExecutors=1000
kylin.engine.spark-conf.spark.dynamicAllocation.executorIdleTimeout=300
kylin.engine.spark-conf.spark.shuffle.service.enabled=true
kylin.engine.spark-conf.spark.hadoop.dfs.replication=2
kylin.engine.spark-conf.spark.driver.memory=4G
kylin.engine.spark-conf.spark.executor.memory=4G
kylin.engine.spark-conf.spark.executor.cores=1
kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024

Flink Cube Engine:

kylin.engine.flink-conf.jobmanager.heap.size=2G
kylin.engine.flink-conf.taskmanager.heap.size=4G
kylin.engine.flink-conf.taskmanager.numberOfTaskSlots=4
kylin.engine.flink-conf.taskmanager.memory.preallocate=false
kylin.engine.flink-conf.job.parallelism=80
kylin.engine.flink-conf.program.enableObjectReuse=false

Flink 引擎後續規劃

Flink Cube 引擎,隨 Kylin 3.1 版本一起釋出,這給了使用者足夠的信心來使用它。當然,由於維護精力受限,它還有一些不足和待改進的空間,我們很開心看到社群也有其他小夥伴將 Flink Cube 引擎在自己公司內使用並將相關的優化與改進反饋回 Kylin社群。例如,harveyyue 同學實現了 cubing step 中的 fact distinct 以及 convert to HFile 等子任務[2]。隨著 Flink Cube 引擎被正式釋出,我們有理由相信它能在 Kylin 生態中佔有一席之地。

參考文獻:

[1]: http://kylin.apache.org/blog/2017/02/23/by-layer-spark-cubing/

[2]: https://github.com/apache/kylin/commits?author=harveyyue

[3]: https://issues.apache.org/jira/browse/KYLIN-3758


作者簡介:

楊華,T3 出行大資料平臺負責人,前騰訊高階工程師。Apache Hudi Committer & PMC Member。Apache Kylin 的 Flink Cube Engine 作者。


*如果想第一時間獲得 Kylin 的資訊和活動資訊,請新增 K 小助 (微訊號:uncertainly5)並備註您的 “所在城市-公司-崗位-暱稱”。

瞭解更多大資料資訊,點選進入 Kyligence 官網