1. 程式人生 > 其它 >Spark Streaming -Apache Flink bilibili 實時平臺的架構與實踐

Spark Streaming -Apache Flink bilibili 實時平臺的架構與實踐

簡介:本文由 bilibili 大資料實時平臺負責人鄭志升分享,基於對 bilibili 實時計算的痛點分析,詳細介紹了 bilibili Saber 實時計算平臺架構與實踐。本次分享主要圍繞以下四個方面:實時計算的痛點、Saber 的平臺演進、結合 AI 的案例實踐、未來的發展與思考。

摘要:本文由 bilibili 大資料實時平臺負責人鄭志升分享,基於對 bilibili 實時計算的痛點分析,詳細介紹了 bilibili Saber 實時計算平臺架構與實踐。本次分享主要圍繞以下四個方面:
一、實時計算的痛點
二、Saber 的平臺演進
三、結合 AI 的案例實踐
四、未來的發展與思考
重要:點選「PPT」可下載 Flink Forward Asia 大會全部PPT。
一、實時計算的痛點
1.痛點
各個業務部門進行業務研發時都有實時計算的需求。早期,在沒有平臺體系做支撐時開發工作難度較大,由於不同業務部門的語言種類和體系不同,導致管理和維護非常困難。其次,bilibili 有很多關於使用者增長、渠道投放的分析等 BI 分析任務。而且還需要對實時數倉的實時資料進行清洗。此外,bilibili 作為一個內容導向的視訊網站,AI 推薦場景下的實時計算需求也比較強烈。
2.痛點共性

  • 開發門檻高:基於底層實時引擎做開發,需要關注的東西較多。包括環境配置、語言基礎,而編碼過程中還需要考慮資料的可靠性、程式碼的質量等。其次,市場實時引擎種類多樣,使用者選擇有一定困難。

  • 運維成本高:運維成本主要體現在兩方面。首先是作業穩定性差。早期團隊有 Spark 叢集、YARN 叢集,導致作業穩定性差,容錯等方面難以管理。其次,缺乏統一的監控告警體系,業務團隊需要重複工作,如計算延時、斷流、波動、故障切換等。

  • AI 實時工程難:bilibili 客戶端首頁推薦頁面依靠 AI 體系的支撐,早期在 AI 機器學習方面遇到非常多問題。機器學習是一套演算法與工程交叉的體系。工程注重的是效率與程式碼複用,而演算法更注重特徵提取以及模型產出。實際上 AI 團隊要承擔很多工程的工作,在一定程度上十分約束實驗的展開。另外,AI 團隊語言體系和框架體系差異較大,所以工程是基建體系,需要提高基建才能加快 AI 的流程,降低演算法人員的工程投入。


3.基於 Apache Flink 的流式計算平臺
為解決上述問題,bilibili 希望根據以下三點要求構建基於 Apache Flink 的流式計算平臺。

  • 第一點,需要提供 SQL 化程式設計。bilibili 對 SQL 進行了擴充套件,稱為 BSQL。BSQL 擴充套件了 Flink 底層 SQL 的上層,即 SQL 語法層。
  • 第二點,DAG 拖拽程式設計,一方面使用者可以通過畫板來構建自己的 Pipeline,另一方面使用者也可以使用原生 Jar 方式進行編碼。
  • 第三點,作業的一體化託管運維。


涵蓋場景:bilibili 流式計算平臺主要涵蓋四個方面的場景。

  • AI 工程方向,解決了廣告、搜尋、推薦的流式 Joiner 和維表 Joiner;
  • 實時計算的特徵支援,支援 Player 以及 CDN 的質量監控。包括直播、PCU、卡頓率、CDN 質量等;
  • 使用者增長,即如何藉助實時計算進行渠道分析、調整渠道投放效果;
  • 實時 ETL,包括 Boss 實時播報、實時大屏、看板等。


二、Saber 的平臺演進
1.平臺架構
實時平臺由實時傳輸和實時計算兩部分組成,平臺底層統一管理元資料、血緣、許可權以及作業運維等。實時傳輸主要負責將資料傳入到大資料體系中。實時計算基於 BSQL 提供各種應用場景支援。
如下圖所示,實時傳輸有 APP 日誌、資料庫 Binlog、服務端日誌或系統日誌。bilibili 內部的 Lancer 系統解決資料落地到 Kafka 或 HDFS。計算體系主要圍繞 Saber 構建一套 BSQL,底層基於 YARN 進行排程管理。
上層核心基於 Flink 構建執行池。再向上一層滿足多種維表場景,包括 MySQL、Redis、HBase。狀態(State)部分在 RocksDB 基礎上,還擴充套件了 MapDB、Redis。Flink 需要 IO 密集是很麻煩的問題,因為 Flink 的資源排程體系內有記憶體和 CPU,但 IO 單位未做統一管理。當某一個作業對 IO 有強烈的需求時,需要分配很多以 CPU 或記憶體為單位的資源,且未必能夠很好的滿足 IO 的擴充套件。所以本質上 bilibili 現階段是將 IO 密集的資源的 State 轉移到 Redis 上做緩解。資料經過 BSQL 計算完成之後傳輸到實時數倉,如 Kafka、HBase、ES 或 MySQL、TiDB。最終到 AI 或 BI、報表以及日誌中心。


2. 開發架構設計
(1)開發架構圖:如下圖左側所示。最上層是 Saber-Streamer,主要進行作業提交以及 API 管理。下一層是 BSQL 層,主要進行 SQL 的擴充套件和解析,包括自定義運算元和個性運算元。再下層是執行時態,下面是引擎層。執行時態主要管理引擎層作業的上下層。bilibili 早期使用的引擎是 Spark Streaming,後期擴充套件了 Flink,在開發架構中預留了一部分引擎層的擴充套件。最下層是狀態儲存層,右側為指標監控模組。
(2)平臺設計準則:Saber 平臺系統設計時團隊關注其邊界以及規範和準則,有以下四個關鍵點。第一是對 Streaming workflows 進行抽象。第二是資料規範性,保證 schema 完整。第三是通用的 BSQL 解析層。第四是工程效率。

  • Streaming workflows:下圖為流計算模型抽象。大資料計算引擎的本質是資料輸入經過一個 function 得到輸出,所以 function 本質是一個能夠做 DAG 轉換的 Transform。Saber 平臺期望的流計算抽象形態是提供相應的 Source,計算過程中是一個 Transform 的 DAG,最後有一個 Sink 的輸出。

在上述抽象過程中規範語義化標準。即最後輸入、輸出給定規範標準,底層通過 Json 表達方式提交作業。在沒有介面的情況下,也可以直接通過 Json 方式拉起作業。

  • 讓資料說話:資料抽象化。計算過程中的資料來源於資料整合的上報。資料整合的上報有一套統一的平臺入口。使用者首先需要在平臺上構建一個輸入的資料來源。使用者選擇了一個對應的資料來源,平臺可以將其分發到 Kafka、 HBase、 Hive 等,並且在分發過程中要求使用者定義 Schema。所以在資料整合過程中,可以輕鬆地管理輸入語言的 Schema。計算過程中,使用者選擇 Input Source,比如選擇一個 HBase 的表或 Kafka 的表,此時 Schema 已是強約束的。使用者通過平臺提供的 BSQL 或者 DAG 的方式進行結果表或者指標的輸出。

  • BSQL 通用設計:BSQL 是遵照 Streaming workflows 設計的思想,核心工作圍繞 Source、Transform 以及 Sink。Transform 主要依託 Flink SQL,所以 BSQL 更多是在 Source 和 Sink 上進行分裝,支援 DDL 的分裝。此處 DDL 參照阿里雲對外資料進行了擴充套件。另外,BSQL 針對計算過程進行了優化,如針對運算元計算的資料傾斜問題採取分桶 + hash 策略進行打掃。針對 distinct 類 count,非精準計算採用 Redis 的 HyperLogLog。

  • BSQL 解析模型:BSQL 解析模型拓撲展開如下圖。當用戶提交了一個 SQL,目標是將 SQL 轉化成樹。之後可以獲取 SqlNode 節點。SqlNode 節點中有很多元資料資訊。在 SqlNode 樹的情況下實現 Table 解析器,將不同的 SqlNode 節點轉化成 Flink 相應的 Streamer 進行對映。

  • BSQL 執行流程:使用者提交 SQL,BSQL 首先進行驗證並構建 SQL 樹。驗證與構建主要是提取表名、欄位資訊,從元資料庫中提取 schema 驗證 SQL 的規範性、完整性和合法性。驗證完成後,將輸入表和結果表註冊到 Flink 的執行時態,其中還包括 UDF 和 watermark 資訊的完善。另外,平臺對 SQL 有一些擴充套件。第三塊是擴充套件的核心工作,將 SQL 樹中擴充套件的子樹轉換為新的節點,然後將 SQL 的 DAG 提交到 Flink 上執行。

  • 效果展示-DAG:如下圖所示,DAG 產品展示,包括並行度的設計、日誌、監控指標告警輸出。

  • 效果展示-BSQL:使用者根據選擇的表的輸入源的 schema 編寫相應的 SQL。最後選擇相應 UDF 就可以提交到相應叢集。

  • 效果展示-作業除錯:如下圖所示為平臺支援的作業除錯。如果只有 SQL 開發卻沒有作業除錯環節,是令使用者痛苦的。故平臺支援通過檔案上傳的方式以及線上取樣的方式進行作業除錯 SQL。

  • 效果展示-作業運維:平臺提供給使用者一些監控指標、使用者可自定義擴充套件的指標以及 bilibili 實現的一些特殊 SQL 的自定義指標。下圖所示為部分佇列的執行情況。


三、結合 AI 的案例實踐
1.AI - 機器學習現狀
AI 體系中有 Offline 和 Online 過程。Online(線上訓練)根據流量做 A/B 實驗,根據不同實驗的效果做推薦。同時每個實驗需要有相應的模型 push 到線上。AI 的痛點集中在 Offline(離線訓練)。Offline 則通過流式方式進行訓練。下圖是 Offline 流式訓練早期情況。使用者需要構建流和流的實時 join,從而產出實時 label 流。而流和維表及特徵資訊的 join 來產出實時 instance 流,但早期相關的工程服務存在著單點問題,服務質量、穩定性帶來的維護成本也很高,致使 AI 在早期 Pipeline 的構建下投入非常大。


2.弊端與痛點

  • 資料時效性:資料時效性無法得到保證。很多資料是通過離線方式進行計算,但很多特徵的時效性要求非常高。
  • 工程質量:單點工程不利於服務擴充套件以及穩定性保障。
  • 工程效率:每一個實驗都有較高門檻,需要做 Label 生產,Features 計算以及 Instance 拼接。在不同業務線,不同場景的推薦背後,演算法同學做工程工作。他們掌握的語言不同,導致工程上語言非常亂。另外,流、批不一致,模型的訓練在實時環境與離線批次環境的工程差異很大,其背後的邏輯相似,導致人員投入翻倍增長。

3.模型訓練的工程化
構建一套基於 Saber-BSQL、Flink 引擎的資料計算 Pipeline,極大簡化 Instance 流的構建。其核心需要解決以下三個問題:Streaming Join Streaming(流式 SJoin),Streaming Join Table(維表 DJoin),Real-time Feature(實時特徵)。

  • SJoin-工程背景:流量規模大,如 bilibili 首頁推薦的流量,AI 的展現點選 Join,來自全站的點選量和展現。此外,買二手手機地圖不僅有雙流 Join,還有三流及以上的 Join,如廣告展現流、點選流、搜尋查詢流等。第三,不同 Join 對 ETL 的清洗不同。如果不能通過 SQL 的方式進行表達,則需要為使用者提供通用的擴充套件,解決不同業務對 Join 之前的定製化 ETL 清洗。第四,非典型 A Left Join B On Time-based Window 模型。主流 A 在視窗時間內 Join 成功後,需要等待視窗時間結束再吐出資料,延長了主流 A 在視窗的停留時間。此場景較為關鍵,bilibili 內部不僅廣告、AI、搜尋,包括直播都需要類似的場景。因為 AI 機器學習需要正負樣本均勻以保證訓練效果,所以第四點問題屬於強需求。
  • SJoin-工程規模:基於線上實時推薦 Joiner。原始 feed 流與 click 流,QPS 高峰分別在 15w 和 2w,Join 輸出 QPS 高峰達到 10w,位元組量高峰為 200 M/s。keyState 狀態查詢量維持在高峰值 60w,包括 read、write、exist 等狀態。一小時 window 下,Timer 的 key 量 15w3600 = 54 億條,RocksDBState 量達到 200M3600 = 700G。實際過程中,採用原生 Flink 在該規模下會遇到較多的效能問題,如在早期 Flink 1.3.* 版本,其穩定性會較差。
  • SJoin-技術痛點:下圖是 Flink 使用 WindowOperator 時的內部拓撲圖。使用者開啟視窗,每一條記錄都是一個 Window 視窗。第一個問題是視窗分配量巨大,QPS 與視窗分配量基本持恆。第二個問題是 Timer Service 每一個記錄都打開了一個視窗,在早期原生 Flink 中是一個記憶體佇列,記憶體佇列部分也存在許多問題。底層佇列早期是單執行緒機制,資料 Cache 在記憶體中,存在許多問題。

簡單總結其技術痛點,首先,Timer 效能較差,且記憶體消耗大。第二,Value RocksDB State 在 compact 時會導致流量抖動。類似 HBase,多 level 的 compact 會造成效能抖動和寫放大。第三,重啟流量過大時,由於 Timer 早期只有記憶體佇列,Window 和 Keystate 恢復週期不可控。從磁碟載入大量資料耗時長,服務 recovery 時間久。

  • SJoin-優化思路:首先是 Timer 優化升級。早期社群沒有更好的解決方案時,bilibili 嘗試自研 PersistentTimerManager,後期升級 Flink,採用基於 RocksDB 的 Timer。第二,啟用 Redis 作為 ValueState,提高 State 穩定性。第三,擴充套件 SQL 語法,以支援非典型 A Left Join B On Time-based Window 場景下的 SQL 語義。
  • SJoin 優化-自研 Timer:實現將記憶體資料達到 Max 之後溢寫到磁碟。底層用 MapDB 做磁碟溢寫。磁碟溢寫原理是 LSM 模型,同樣存在資料抖動問題。由於視窗是 1 小時,相當於資料以 1 小時為單位進行 State 管理。如下圖右側所示,當 0 點到 1 點的 1 小時,由於記錄在 1 小時後才會吐出,資料進來只有寫的動作。在 1 點到 2 點,資料會寫入到新的 State,0 點到 1 點的 State 已經到達視窗時間,進行資料吐出。自研 Timer 很好地解決了資料的讀寫問題和抖動問題。但是由於自研 Timer 缺乏 CheckPoint 機制,如果節點上的磁碟出現故障,會導致 State 資料丟失。

  • SJoin 優化-RocksDBTimer:升級 Flink 版本,引入基於 RocksDB 的 Timer。升級後架構如下圖所示。資料從 Kafka 獲取 Topic-Feed 和 Topic-Click,首先對其進行一層清洗,然後進入自定義的 Joiner Operator 運算元。運算元做兩件事,將主流資料吐到 Redis 中,由 Redis 做 State,同時將需要開視窗的 Key 儲存註冊到 Timer Service 中。接下來利用 Timer Service 原生的 CheckPoint 開啟增量 CheckPoint 過程。當 OnTimer 到達時間後,就可以吐出資料。非常此方案契合 SJoin 在高吞吐作業下的要求。

  • SJoin 優化-引入 KVStore:Flink 原生 State 無法滿足要求,在對 Value、IO 要求高時抖動嚴重,RocksDBState 實際使用中會出現抖動問題。對此,bilibili 嘗試過多種改進方案。開 1 小時視窗,資料量約 700G,雙流 1 小時視窗總流量達到 TB 級別。採用分散式 KVStore 儲存,後續進行壓縮後資料量約 700G。

  • SJoin 優化-擴充套件 SQL 語法:擴充套件 SQL 的功能訴求是展現流等待 1 小時視窗,當點選流到達時,不立即吐出 Join 完成的資料,而等待視窗結束後再吐出。故擴充套件了 SQL 語法,雖然目前未達到通用,但是能滿足諸多部門的 AI 需求。語法支援 Select * from A left(global)$time window and $time delay join B on A.xx=B.xx where A.xx=xx。給使用者帶來了很大收益。


進行 SQL 語義擴充套件主要有兩個關鍵點。SQL 語義的定義頂層通過 Calcite 擴充套件 JoinType。首先將 SQL 展開成 SQL 樹。SQL 樹的一個節點為 left(global)$time window and $time delay join。抽取出該子樹,自定義邏輯轉換規則。在此定義了 StreamingJoinRute,將該子樹轉換為新的節點。通過 Flink 提供的非同步 IO 能力,將非同步子樹轉換為 Streaming Table,並將其註冊到 Flink 環境中。通過以上過程支援 SQL 表達。

  • DJoin-工程背景:bilibili 對於維表資料要求不同。比如一些維表資料很大,以 T 為單位,此時如果用 Redis 儲存會造成浪費。而有一些維表資料很小,如實時特徵。同時,維表資料更新粒度不同,可以按天更新、按小時更新、按分鐘更新等。

另外,維表效能要求很高。因為 AI 場景會進行很多實驗,例如某一個特徵比較好,就會開很多模型、調整不同引數進行實驗。單作業下實驗組越多,QPS 越高,RT 要求越高。不同維表儲存介質有差異,對穩定性有顯著影響。調研中有兩種場景。當量比較小,可以使用 Redis 儲存,穩定性較好。當量很大,使用 Redis 成本高,但 HBase CP 架構無法保證穩定性。

  • DJoin-工程優化:需要針對維表 Join 的 SQL 進行語法支援。包括 Cache 優化,當用戶寫多條 SQL 的維表 Join 時,需要提取多條 SQL 維表的 Key,並通過請求合併查詢維表,以提高 IO,以及流量均衡優化等。第二,KV 儲存分場景支援,比如 JDBC、KV。KV 場景中,對百 G 級別使用 Redis 實時更新實時查詢。T 級別使用 HBase 多叢集,比如通過兩套 HBase,Failover+LoadBalance 模式保證 99 線 RT 小於 20ms,以提高穩定性。

  • DJoin-語法擴充套件:DJoin 語法擴充套件與 SJoin 語法擴充套件類似,對 SQL 樹子樹進行轉化,通過 AsyncIO 進行擴充套件,實現維表。

  • DJoin-HBase 高可用:維表資料達到T級別時使用 HBase 進行資料儲存。HBase 高可用性採用雙 HBase 叢集,Failover AB 模式。這時需要考慮兩個問題。第一是資料更新機制。資料更新可以是按小時或按天,採用 HFile BulkLoad 模式,序列+ Interval 間隔匯入,匯入後同步資料預熱,以此保證兩套HBase 叢集的穩定性。第二是資料查詢機制。引入 Hystrix 實現服務熔斷、降級回退策略。當 A 叢集可用性下降時,根據 AB 的 RT 質量,動態切換一定資料到B叢集,以保證資料流量均衡。

下圖為 HBase 雙叢集架構。右側是離線,以天為單位,通過排程框架拉起一個 DAG 進行計算。DAG 的輸出經過兩層序列的 HBase 的 Sink,序列可以保證資料先寫完 A 再寫 B。執行時態中通過 Flink、AsyncIO 方式,通過兩層 HystrixClient。第一層 HystrixClient 主要對第二層 HystrixClient HBase 的 RT 通訊質量進行收集,根據 RT 通訊質量將流量動態分發到兩套 HBase 叢集中。在 A 叢集穩定性很好時,流量都在 A 叢集跑。當 A 叢集出現抖動,會根據失敗率動態切換一定配比流量到 B 叢集。


4.模型訓練的實時 Pipeline
整個體系解決了 AI 模型訓練預生成資料給模型的 Pipeline。展現和點選通過 BSQL 方案實現 Joiner。實時特徵資料通過 BSQL 進行計算,離線資料通過離線排程解決。維表的 Join 會通過 BSQL 構成 Pipeline,從而給機器學習團隊 Instances 流,訓練模型,產出模型。


四、未來的發展與思考
1.Saber-基礎功能完善
越來越多人使用平臺時,基礎運維是最為關鍵的。Saber 平臺將會完善 SQL IDE 開發,如提供更豐富的版本管理、上下線、任務除錯、資源管理、基礎操作等。同時將豐富化作業運維。包括 SLA、上線審批、優先順序、各類系統監控指標、使用者自定義指標告警、作業 OP 操作等。
2.Saber-應用能力提升
Saber 應用能力將會向 AI 方向不斷演進。例如模型訓練的工程化方面,將引入實驗維度概念,通過實驗拉起 SQL Pipeline。同時將為做模型訓練的同學統一流、批 SQL 複用。並且進行模型實驗效果、評估、預警等。實時特徵的工程化方面,將會支援多特徵複合計算,涵蓋特徵計算、儲存、查詢等多個場景。