1. 程式人生 > 其它 >5個網際網路大廠實時數倉建設例項,例例皆經典

5個網際網路大廠實時數倉建設例項,例例皆經典

一、實時數倉建設背景

1. 實時需求日趨迫切

目前各大公司的產品需求和內部決策對於資料實時性的要求越來越迫切,需要實時數倉的能力來賦能。傳統離線數倉的資料時效性是 T+1,排程頻率以天為單位,無法支撐實時場景的資料需求。即使能將排程頻率設定成小時,也只能解決部分時效性要求不高的場景,對於實效性要求很高的場景還是無法優雅的支撐。因此實時使用資料的問題必須得到有效解決。

2. 實時技術日趨成熟

實時計算框架已經經歷了三代發展,分別是:Storm、SparkStreaming、Flink,計算框架越來越成熟。一方面,實時任務的開發已經能通過編寫 SQL 的方式來完成,在技術層面能很好地繼承離線數倉的架構設計思想;另一方面,線上資料開發平臺所提供的功能對實時任務開發、除錯、運維的支援也日漸趨於成熟,開發成本逐步降低,有助於去做這件事。

二、實時數倉建設目的

1. 解決傳統數倉的問題

從目前數倉建設的現狀來看,實時數倉是一個容易讓人產生混淆的概念,根據傳統經驗分析,數倉有一個重要的功能,即能夠記錄歷史。通常,數倉都是希望從業務上線的第一天開始有資料,然後一直記錄到現在。但實時流處理技術,又是強調當前處理狀態的一個技術,結合當前一線大廠的建設經驗和滴滴在該領域的建設現狀,我們嘗試把公司內實時數倉建設的目的定位為,以數倉建設理論和實時技術,解決由於當前離線數倉資料時效性低解決不了的問題。

現階段我們要建設實時數倉的主要原因是:

  • 公司業務對於資料的實時性越來越迫切,需要有實時資料來輔助完成決策;
  • 實時資料建設沒有規範,資料可用性較差,無法形成數倉體系,資源大量浪費;
  • 資料平臺工具對整體實時開發的支援也日漸趨於成熟,開發成本降低。

2. 實時數倉的應用場景

  • 實時 OLAP 分析;
  • 實時資料看板;
  • 實時業務監控;
  • 實時資料介面服務。

三、實時數倉建設方案

接下來我們分析下目前實時數倉建設比較好的幾個案例,希望這些案例能夠給大家帶來一些啟發。

1. 滴滴順風車實時數倉案例

滴滴資料團隊建設的實時數倉,基本滿足了順風車業務方在實時側的各類業務需求,初步建立起順風車實時數倉,完成了整體資料分層,包含明細資料和彙總資料,統一了 DWD 層,降低了大資料資源消耗,提高了資料複用性,可對外輸出豐富的資料服務。

數倉具體架構如下圖所示:

從資料架構圖來看,順風車實時數倉和對應的離線數倉有很多類似的地方。例如分層結構;比如 ODS 層,明細層,彙總層,乃至應用層,他們命名的模式可能都是一樣的。但仔細比較不難發現,兩者有很多區別:

  • 與離線數倉相比,實時數倉的層次更少一些:
  • 從目前建設離線數倉的經驗來看,數倉的資料明細層內容會非常豐富,處理明細資料外一般還會包含輕度彙總層的概念,另外離線數倉中應用層資料在數倉內部,但實時數倉中,app 應用層資料已經落入應用系統的儲存介質中,可以把該層與數倉的表分離;
  • 應用層少建設的好處:實時處理資料的時候,每建一個層次,資料必然會產生一定的延遲;
  • 彙總層少建的好處:在彙總統計的時候,往往為了容忍一部分資料的延遲,可能會人為的製造一些延遲來保證資料的準確。舉例,在統計跨天相關的訂單事件中的資料時,可能會等到 00:00:05 或者 00:00:10 再統計,確保 00:00 前的資料已經全部接受到位了,再進行統計。所以,彙總層的層次太多的話,就會更大的加重人為造成的資料延遲。
  • 與離線數倉相比,實時數倉的資料來源儲存不同:
  • 在建設離線數倉的時候,目前滴滴內部整個離線數倉都是建立在 Hive 表之上。但是,在建設實時數倉的時候,同一份表,會使用不同的方式進行儲存。比如常見的情況下,明細資料或者彙總資料都會存在 Kafka 裡面,但是像城市、渠道等維度資訊需要藉助 Hbase,mysql 或者其他 KV 儲存等資料庫來進行儲存。

接下來,根據順風車實時數倉架構圖,對每一層建設做具體展開:

1) ODS 貼源層建設

根據順風車具體場景,目前順風車資料來源主要包括訂單相關的 binlog 日誌,冒泡和安全相關的 public 日誌,流量相關的埋點日誌等。這些資料部分已採集寫入 kafka 或 ddmq 等資料通道中,部分資料需要藉助內部自研同步工具完成採集,最終基於順風車數倉 ods 層建設規範分主題統一寫入 kafka 儲存介質中。

命名規範:ODS 層實時資料來源主要包括兩種。

一種是在離線採集時已經自動生產的 DDMQ 或者是 Kafka topic,這型別的資料命名方式為採集系統自動生成規範為:cn-binlog-資料庫名-資料庫名 eg:cn-binlog-ihap_fangyuan-ihap_fangyuan

一種是需要自己進行採集同步到 kafka topic 中,生產的 topic 命名規範同離線類似:ODS 層採用:realtime_ods_binlog_{源系統庫/表名}/ods_log_{日誌名} eg: realtime_ods_binlog_ihap_fangyuan

2)DWD 明細層建設

根據順風車業務過程作為建模驅動,基於每個具體的業務過程特點,構建最細粒度的明細層事實表;結合順風車分析師在離線側的資料使用特點,將明細事實表的某些重要維度屬性欄位做適當冗餘,完成寬表化處理,之後基於當前順風車業務方對實時資料的需求重點,重點建設交易、財務、體驗、安全、流量等幾大模組;該層的資料來源於 ODS 層,通過大資料架構提供的 Stream SQL 完成 ETL 工作,對於 binlog 日誌的處理主要進行簡單的資料清洗、處理資料漂移和資料亂序,以及可能對多個 ODS 表進行 Stream Join,對於流量日誌主要是做通用的 ETL 處理和針對順風車場景的資料過濾,完成非結構化資料的結構化處理和資料的分流;該層的資料除了儲存在訊息佇列 Kafka 中,通常也會把資料實時寫入 Druid 資料庫中,供查詢明細資料和作為簡單彙總資料的加工資料來源。

命名規範:DWD 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字元,並且應遵循下述規則:realtime_dwd_{業務/pub}_{資料域縮寫}_[{業務過程縮寫}]_[{自定義表命名標籤縮寫}]

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {自定義表命名標籤縮寫}:實體名稱可以根據資料倉庫轉換整合後做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義
  • 樣例:realtime_dwd_trip_trd_order_base

3) DIM 層

  • 公共維度層,基於維度建模理念思想,建立整個業務過程的一致性維度,降低資料計算口徑和演算法不統一風險;
  • DIM 層資料來源於兩部分:一部分是 Flink 程式實時處理 ODS 層資料得到,另外一部分是通過離線任務出倉得到;
  • DIM 層維度資料主要使用 MySQL、Hbase、fusion(滴滴自研 KV 儲存) 三種儲存引擎,對於維表資料比較少的情況可以使用 MySQL,對於單條資料大小比較小,查詢 QPS 比較高的情況,可以使用 fusion 儲存,降低機器記憶體資源佔用,對於資料量比較大,對維表資料變化不是特別敏感的場景,可以使用 HBase 儲存。

命名規範:DIM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 30 個字元,並且應遵循下述規則:dim_{業務/pub}_{維度定義}[_{自定義命名標籤}]:

  • {業務/pub}:參考業務命名
  • {維度定義}:參考維度命名
  • {自定義表命名標籤縮寫}:實體名稱可以根據資料倉庫轉換整合後做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義
  • 樣例:dim_trip_dri_base

4) DWM 彙總層建設

在建設順風車實時數倉的彙總層的時候,跟順風車離線數倉有很多一樣的地方,但其具體技術實現會存在很大不同。

第一:對於一些共性指標的加工,比如 pv,uv,訂單業務過程指標等,我們會在彙總層進行統一的運算,確保關於指標的口徑是統一在一個固定的模型中完成。對於一些個性指標,從指標複用性的角度出發,確定唯一的時間欄位,同時該欄位儘可能與其他指標在時間維度上完成拉齊,例如行中異常訂單數需要與交易域指標在事件時間上做到拉齊。

第二:在順風車彙總層建設中,需要進行多維的主題彙總,因為實時數倉本身是面向主題的,可能每個主題會關心的維度都不一樣,所以需要在不同的主題下,按照這個主題關心的維度對資料進行彙總,最後來算業務方需要的彙總指標。在具體操作中,對於 pv 類指標使用 Stream SQL 實現 1 分鐘彙總指標作為最小彙總單位指標,在此基礎上進行時間維度上的指標累加;對於 uv 類指標直接使用 druid 資料庫作為指標彙總容器,根據業務方對彙總指標的及時性和準確性的要求,實現相應的精確去重和非精確去重。

第三:彙總層建設過程中,還會涉及到衍生維度的加工。在順風車券相關的彙總指標加工中我們使用 Hbase 的版本機制來構建一個衍生維度的拉鍊表,通過事件流和 Hbase 維表關聯的方式得到實時資料當時的準確維度

命名規範:DWM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字元,並且應遵循下述規則:realtime_dwm_{業務/pub}_{資料域縮寫}_{資料主粒度縮寫}_[{自定義表命名標籤縮寫}]_{統計時間週期範圍縮寫}:

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {資料主粒度縮寫}:指資料主要粒度或資料域的縮寫,也是聯合主鍵中的主要維度
  • {自定義表命名標籤縮寫}:實體名稱可以根據資料倉庫轉換整合後做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義
  • {統計時間週期範圍縮寫}:1d:天增量;td:天累計(全量);1h:小時增量;th:小時累計(全量);1min:分鐘增量;tmin:分鐘累計(全量)
  • 樣例:realtime_dwm_trip_trd_pas_bus_accum_1min

(5)APP 應用層

該層主要的工作是把實時彙總資料寫入應用系統的資料庫中,包括用於大屏顯示和實時 OLAP 的 Druid 資料庫(該資料庫除了寫入應用資料,也可以寫入明細資料完成彙總指標的計算)中,用於實時資料介面服務的 Hbase 資料庫,用於實時資料產品的 mysql 或者 redis 資料庫中。

命名規範:基於實時數倉的特殊性不做硬性要求。

2. 快手實時數倉場景化案例

1) 目標及難點

① 目標

首先由於是做數倉,因此希望所有的實時指標都有離線指標去對應,要求實時指標和離線指標整體的資料差異在 1% 以內,這是最低標準。

其次是資料延遲,其 SLA 標準是活動期間所有核心報表場景的資料延遲不能超過 5 分鐘,這 5 分鐘包括作業掛掉之後和恢復時間,如果超過則意味著 SLA 不達標。

最後是穩定性,針對一些場景,比如作業重啟後,我們的曲線是正常的,不會因為作業重啟導致指標產出一些明顯的異常。

②難點

第一個難點是資料量大。每天整體的入口流量資料量級大概在萬億級。在活動如春晚的場景,QPS 峰值能達到億 / 秒。

第二個難點是元件依賴比較複雜。可能這條鏈路裡有的依賴於 Kafka,有的依賴 Flink,還有一些依賴 KV 儲存、RPC 介面、OLAP 引擎等,我們需要思考在這條鏈路裡如何分佈,才能讓這些元件都能正常工作。

第三個難點是鏈路複雜。目前我們有 200+ 核心業務作業,50+ 核心資料來源,整體作業超過 1000。

2) 實時數倉 - 分層模型

基於上面三個難點,來看一下數倉架構:

如上所示:

最下層有三個不同的資料來源,分別是客戶端日誌、服務端日誌以及 Binlog 日誌;在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細資料,另一個是 DWS 層,做公共聚合資料,DIM 是我們常說的維度。我們有一個基於離線數倉的主題預分層,這個主題預分層可能包括流量、使用者、裝置、視訊的生產消費、風控、社交等。DWD 層的核心工作是標準化的清洗;DWS 層是把維度的資料和 DWD 層進行關聯,關聯之後生成一些通用粒度的聚合層次。再往上是應用層,包括一些大盤的資料,多維分析的模型以及業務專題資料;最上面是場景。整體過程可以分為三步:

第一步是做業務資料化,相當於把業務的資料接進來;第二步是資料資產化,意思是對資料做很多的清洗,然後形成一些規則有序的資料;第三步是資料業務化,可以理解資料在實時資料層面可以反哺業務,為業務資料價值建設提供一些賦能。

3) 實時數倉 - 保障措施

基於上面的分層模型,來看一下整體的保障措施:

保障層面分為三個不同的部分,分別是質量保障,時效保障以及穩定保障。

我們先看藍色部分的質量保障。針對質量保障,可以看到在資料來源階段,做了如資料來源的亂序監控,這是我們基於自己的 SDK 的採集做的,以及資料來源和離線的一致性校準。研發階段的計算過程有三個階段,分別是研發階段、上線階段和服務階段。研發階段可能會提供一個標準化的模型,基於這個模型會有一些 Benchmark,並且做離線的比對驗證,保證質量是一致的;上線階段更多的是服務監控和指標監控;在服務階段,如果出現一些異常情況,先做 Flink 狀態拉起,如果出現了一些不符合預期的場景,我們會做離線的整體資料修復。

第二個是時效性保障。針對資料來源,我們把資料來源的延遲情況也納入監控。在研發階段其實還有兩個事情:首先是壓測,常規的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;通過壓測之後,會有一些任務上線和重啟效能評估,相當於按照 CP 恢復之後,重啟的效能是什麼樣子。

最後一個是穩定保障。這在大型活動中會做得比較多,比如切換演練和分級保障。我們會基於之前的壓測結果做限流,目的是保障作業在超過極限的情況下,仍然是穩定的,不會出現很多的不穩定或者 CP 失敗的情況。之後我們會有兩種不同的標準,一種是冷備雙機房,另外一種是熱備雙機房。冷備雙機房是:當一個單機房掛掉,我們會從另一個機房去拉起;熱備雙機房:相當於同樣一份邏輯在兩個機房各部署一次。以上就是我們整體的保障措施。

4)快手場景問題及解決方案

① PV/UV 標準化

  •  場景

第一個問題是 PV/UV 標準化,這裡有三個截圖:

第一張圖是春晚活動的預熱場景,相當於是一種玩法,第二和第三張圖是春晚當天的發紅包活動和直播間截圖。

在活動進行過程中,我們發現 60~70% 的需求是計算頁面裡的資訊,如:

這個頁面來了多少人,或者有多少人點選進入這個頁面;

活動一共來了多少人;

頁面裡的某一個掛件,獲得了多少點選、產生了多少曝光。

  • 方案

抽象一下這個場景就是下面這種 SQL:

簡單來說,就是從一張表做篩選條件,然後按照維度層面做聚合,接著產生一些 Count 或者 Sum 操作。

基於這種場景,我們最開始的解決方案如上圖右邊所示。

我們用到了 Flink SQL 的 Early Fire 機制,從 Source 資料來源取資料,之後做了 DID 的分桶。比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題。分桶之後會有一個叫做 Local Window Agg 的東西,相當於資料分完桶之後把相同型別的資料相加。Local Window Agg 之後再按照維度進行 Global Window Agg 的合桶,合桶的概念相當於按照維度計算出最終的結果。Early Fire 機制相當於在 Local Window Agg 開一個天級的視窗,然後每分鐘去對外輸出一次。

這個過程中我們遇到了一些問題,如上圖左下角所示。

在程式碼正常執行的情況下是沒有問題的,但如果整體資料存在延遲或者追溯歷史資料的情況,比如一分鐘 Early Fire 一次,因為追溯歷史的時候資料量會比較大,所以可能導致 14:00 追溯歷史,直接讀到了 14:02 的資料,而 14:01 的那個點就被丟掉了,丟掉了以後會發生什麼?

在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史資料的結果。橫座標是分鐘,縱座標是截止到當前時刻的頁面 UV,我們發現有些點是橫著的,意味著沒有資料結果,然後一個陡增,然後又橫著的,接著又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線。

為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的。

資料開一個大的天級視窗,大視窗下又開了一個小的分鐘級視窗,資料按資料本身的 Row Time 落到分鐘級視窗。

Watermark 推進過了視窗的 event_time,它會進行一次下發的觸發,通過這種方式可以解決回溯的問題,資料本身落在真實的視窗, Watermark 推進,在視窗結束後觸發。此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序資料本身是一個不丟棄的狀態,會記錄到最新的累計資料。最後是語義一致性,它會基於事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一致性是相當高的。以上是 PV/UV 一個標準化的解決方案。

② DAU 計算

  • 背景介紹

下面介紹一下 DAU 計算:

我們對於整個大盤的活躍裝置、新增裝置和迴流裝置有比較多的監控。

活躍裝置指的是當天來過的裝置;新增裝置指的是當天來過且歷史沒有來過的裝置;迴流裝置指的是當天來過且 N 天內沒有來過的裝置。但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標。

我們看一下離線過程中,邏輯應該怎麼算。

首先我們先算活躍裝置,把這些合併到一起,然後做一個維度下的天級別去重,接著再去關聯維度表,這個維度表包括裝置的首末次時間,就是截止到昨天裝置首次訪問和末次訪問的時間。

得到這個資訊之後,我們就可以進行邏輯計算,然後我們會發現新增和迴流的裝置其實是活躍裝置裡打的一個子標籤。新增裝置就是做了一個邏輯處理,迴流裝置是做了 30 天的邏輯處理,基於這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?

其實我們最開始是這麼做的,但遇到了一些問題:

第一個問題是:資料來源是 6~8 個,而且我們大盤的口徑經常會做微調,如果是單作業的話,每次微調的過程之中都要改,單作業的穩定性會非常差;第二個問題是:資料量是萬億級,這會導致兩個情況,首先是這個量級的單作業穩定性非常差,其次是實時關聯維表的時候用的 KV 儲存,任何一個這樣的 RPC 服務介面,都不可能在萬億級資料量的場景下保證服務穩定性;第三個問題是:我們對於時延要求比較高,要求時延小於一分鐘。整個鏈路要避免批處理,如果出現了一些任務效能的單點問題,我們還要保證高效能和可擴容。

  • 技術方案

針對以上問題,介紹一下我們是怎麼做的:

如上圖的例子,第一步是對 A B C 這三個資料來源,先按照維度和 DID 做分鐘級別去重,分別去重之後得到三個分鐘級別去重的資料來源,接著把它們 Union 到一起,然後再進行同樣的邏輯操作。

這相當於我們資料來源的入口從萬億變到了百億的級別,分鐘級別去重之後再進行一個天級別的去重,產生的資料來源就可以從百億變成了幾十億的級別。

在幾十億級別資料量的情況下,我們再去關聯資料服務化,這就是一種比較可行的狀態,相當於去關聯使用者畫像的 RPC 介面,得到 RPC 介面之後,最終寫入到了目標 Topic。這個目標 Topic 會匯入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,指標看板服務等。

這個方案有三個方面的優勢,分別是穩定性、時效性和準確性。

首先是穩定性。鬆耦合可以簡單理解為當資料來源 A 的邏輯和資料來源 B 的邏輯需要修改時,可以單獨修改。第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現瞭如流量問題,不會影響後面的部分,所以它擴容比較簡單,除此之外還有服務化後置和狀態可控。其次是時效性,我們做到毫秒延遲,並且維度豐富,整體上有 20+ 的維度做多維聚合。最後是準確性,我們支援資料驗證、實時監控、模型出口統一等。此時我們遇到了另外一個問題 - 亂序。對於上方三個不同的作業,每一個作業重啟至少會有兩分鐘左右的延遲,延遲會導致下游的資料來源 Union 到一起就會有亂序。

  • 延遲計算方案

遇到上面這種有亂序的情況下,我們要怎麼處理?

我們總共有三種處理方案:

第一種解決方案是用 “did + 維度 + 分鐘” 進行去重,Value 設為 “是否來過”。比如同一個 did,04:01 來了一條,它會進行結果輸出。同樣的,04:02 和 04:04 也會進行結果輸出。但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進行結果輸出。

這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀態大小是存 10 分鐘的兩倍,到後面這個狀態大小有點不太可控,因此我們又換了解決方案 2。

第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在資料來源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示。04:01 來了一條資料,進行結果輸出。04:02 來了一條資料,如果是同一個 did,那麼它會更新時間戳,然後仍然做結果輸出。04:04 也是同樣的邏輯,然後將時間戳更新到 04:04,如果後面來了一條 04:01 的資料,它發現時間戳已經更新到 04:04,它會丟棄這條資料。這樣的做法大幅度減少了本身所需要的一些狀態,但是對亂序是零容忍,不允許發生任何亂序的情況,由於我們不好解決這個問題,因此我們又想出瞭解決方案 3。

方案 3 是在方案 2 時間戳的基礎之上,加了一個類似於環形緩衝區,在緩衝區之內允許亂序。

比如 04:01 來了一條資料,進行結果輸出;04:02 來了一條資料,它會把時間戳更新到 04:02,並且會記錄同一個裝置在 04:01 也來過。如果 04:04 再來了一條資料,就按照相應的時間差做一個位移,最後通過這樣的邏輯去保障它能夠容忍一定的亂序。

綜合來看這三個方案:

方案 1 在容忍 16 分鐘亂序的情況下,單作業的狀態大小在 480G 左右。這種情況雖然保證了準確性,但是作業的恢復和穩定性是完全不可控的狀態,因此我們還是放棄了這個方案;

方案 2 是 30G 左右的狀態大小,對於亂序 0 容忍,但是資料不準確,由於我們對準確性的要求非常高,因此也放棄了這個方案;

方案 3 的狀態跟方案 1 相比,它的狀態雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果。方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3。

③ 運營場景

  •  背景介紹

運營場景可分為四個部分:

第一個是資料大屏支援,包括單直播間的分析資料和大盤的分析資料,需要做到分鐘級延遲,更新要求比較高;

第二個是直播看板支援,直播看板的資料會有特定維度的分析,特定人群支援,對維度豐富性要求比較高;

第三個是資料策略榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級別的資料,更新要求比較低;

第四個是 C 端實時指標展示,查詢量比較大,但是查詢模式比較固定。

下面進行分析這 4 種不同的狀態產生的一些不同的場景。

前 3 種基本沒有什麼差別,只是在查詢模式上,有的是特定業務場景,有的是通用業務場景。

針對第 3 種和第 4 種,它對於更新的要求比較低,對於吞吐的要求比較高,過程之中的曲線也不要求有一致性。第 4 種查詢模式更多的是單實體的一些查詢,比如去查詢內容,會有哪些指標,而且對 QPS 要求比較高。

  •  技術方案

針對上方 4 種不同的場景,我們是如何去做的?

首先看一下基礎明細層 (圖中左方),資料來源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費資訊,還有觀看 / 點贊 / 評論。經過一輪基礎清洗,然後做維度管理。上游的這些維度資訊來源於 Kafka,Kafka 寫入了一些內容的維度,放到了 KV 儲存裡邊,包括一些使用者的維度。

這些維度關聯了之後,最終寫入 Kafka 的 DWD 事實層,這裡為了做效能的提升,我們做了二級快取的操作。

如圖中上方,我們讀取 DWD 層的資料然後做基礎彙總,核心是視窗維度聚合生成 4 種不同粒度的資料,分別是大盤多維彙總 topic、直播間多維彙總 topic、作者多維彙總 topic、使用者多維彙總 topic,這些都是通用維度的資料。

如圖中下方,基於這些通用維度資料,我們再去加工個性化維度的資料,也就是 ADS 層。拿到了這些資料之後會有維度擴充套件,包括內容擴充套件和運營維度的拓展,然後再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic。

分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度。通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯。如果這兩個耦合在一起的話,會發現任務經常出問題,並且分不清楚哪個任務的職責是什麼,構建不出這樣的一個穩定層。

如圖中右方,最終我們用到了三種不同的引擎。簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業務看板的場景。

3. 騰訊看點實時數倉案例

騰訊看點業務為什麼要構建實時數倉,因為原始的上報資料量非常大,一天上報峰值就有上萬億條。而且上報格式混亂。缺乏內容維度資訊、使用者畫像資訊,下游沒辦法直接使用。而我們提供的實時數倉,是根據騰訊看點資訊流的業務場景,進行了內容維度的關聯,使用者畫像的關聯,各種粒度的聚合,下游可以非常方便的使用實時資料。

1) 方案選型

那就看下我們多維實時資料分析系統的方案選型,選型我們對比了行業內的領先方案,選擇了最符合我們業務場景的方案。

第一塊是實時數倉的選型,我們選擇的是業界比較成熟的 Lambda 架構,他的優點是靈活性高、容錯性高、成熟度高和遷移成本低;缺點是實時、離線資料用兩套程式碼,可能會存在一個口徑修改了,另一個沒改的問題,我們每天都有做資料對賬的工作,如果有異常會進行告警。

第二塊是實時計算引擎選型,因為 Flink 設計之初就是為了流處理,SparkStreaming 嚴格來說還是微批處理,Strom 用的已經不多了。再看 Flink 具有 Exactly-once 的準確性、輕量級 Checkpoint 容錯機制、低延時高吞吐和易用性高的特點,我們選擇了 Flink 作為實時計算引擎。

第三塊是實時儲存引擎,我們的要求就是需要有維度索引、支援高併發、預聚合、高效能實時多維 OLAP 查詢。可以看到,Hbase、Tdsql 和 ES 都不能滿足要求,Druid 有一個缺陷,它是按照時序劃分 Segment,無法將同一個內容,存放在同一個 Segment 上,計算全域性 TopN 只能是近似值,所以我們選擇了最近兩年大火的 MPP 資料庫引擎 ClickHouse。

2) 設計目標與設計難點

我們多維實時資料分析系統分為三大模組:

  • 實時計算引擎
  • 實時儲存引擎
  • App 層

難點主要在前兩個模組:實時計算引擎和實時儲存引擎。

  • 千萬級/s 的海量資料如何實時接入,並且進行極低延遲維表關聯。
  • 實時儲存引擎如何支援高併發寫入、高可用分散式和高效能索引查詢,是比較難的。

這幾個模組的具體實現,看一下我們系統的架構設計。

3) 架構設計

前端採用的是開源元件 Ant Design,利用了 Nginx 伺服器,部署靜態頁面,並反向代理了瀏覽器的請求到後臺伺服器上。

後臺服務是基於騰訊自研的 RPC 後臺服務框架寫的,並且會進行一些二級快取。

實時數倉部分,分為了接入層、實時計算層和實時數倉儲存層。

  • 接入層主要是從千萬級/s 的原始訊息佇列中,拆分出不同行為資料的微佇列,拿看點的視訊來說,拆分過後,資料就只有百萬級/s 了;
  • 實時計算層主要負責,多行行為流水資料進行行轉列,實時關聯使用者畫像資料和內容維度資料;
  • 實時數倉儲存層主要是設計出符合看點業務的,下游好用的實時訊息佇列。我們暫時提供了兩個訊息佇列,作為實時數倉的兩層。一層 DWM 層是內容 ID-使用者 ID 粒度聚合的,就是一條資料包含內容 ID-使用者 ID 還有 B 側內容資料、C 側使用者資料和使用者畫像資料;另一層是 DWS 層,是內容 ID 粒度聚合的,一條資料包含內容 ID,B 側資料和 C 側資料。可以看到內容 ID-使用者 ID 粒度的訊息佇列流量進一步減小到十萬級/s,內容 ID 粒度的更是萬級/s,並且格式更加清晰,維度資訊更加豐富。

實時儲存部分分為實時寫入層、OLAP 儲存層和後臺介面層。

  • 實時寫入層主要是負責 Hash 路由將資料寫入;
  • OLAP 儲存層利用 MPP 儲存引擎,設計符合業務的索引和物化檢視,高效儲存海量資料;
  • 後臺介面層提供高效的多維實時查詢介面。

4) 實時計算

這個系統最複雜的兩塊,實時計算和實時儲存。

先介紹實時計算部分:分為實時關聯和實時數倉。

① 實時高效能維表關聯

實時維表關聯這一塊難度在於 百萬級/s 的實時資料流,如果直接去關聯 HBase,1 分鐘的資料,關聯完 HBase 耗時是小時級的,會導致資料延遲嚴重。

我們提出了幾個解決方案:

  • 第一,在 Flink 實時計算環節,先按照 1 分鐘進行了視窗聚合,將視窗內多行行為資料轉一行多列的資料格式,經過這一步操作,原本小時級的關聯耗時下降到了十幾分鍾,但是還是不夠的。
  • 第二,在訪問 HBase 內容之前設定一層 Redis 快取,因為 1000 條資料訪問 HBase 是秒級的,而訪問 Redis 是毫秒級的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍。為了防止過期的資料浪費快取,快取過期時間設定成 24 小時,同時通過監聽寫 HBase Proxy 來保證快取的一致性。這樣將訪問時間從十幾分鍾變成了秒級。
  • 第三,上報過程中會上報不少非常規內容 ID,這些內容 ID 在內容 HBase 中是不儲存的,會造成快取穿透的問題。所以在實時計算的時候,我們直接過濾掉這些內容 ID,防止快取穿透,又減少一些時間。
  • 第四,因為設定了定時快取,會引入一個快取雪崩的問題。為了防止雪崩,我們在實時計算中,進行了削峰填谷的操作,錯開設定快取的時間。

可以看到,優化前後,資料量從百億級減少到了十億級,耗時從小時級減少到了數十秒,減少 99%。

②下游提供服務

實時數倉的難度在於:它處於比較新的領域,並且各個公司各個業務差距比較大,怎麼能設計出方便,好用,符合看點業務場景的實時數倉是有難度的。

先看一下實時數倉做了什麼,實時數倉對外就是幾個訊息佇列,不同的訊息佇列裡面存放的就是不同聚合粒度的實時資料,包括內容 ID、使用者 ID、C 側行為資料、B 側內容維度資料和使用者畫像資料等。

我們是怎麼搭建實時數倉的,就是上面介紹的實時計算引擎的輸出,放到訊息佇列中儲存,可以提供給下游多使用者複用。

我們可以看下,在我們建設實時資料倉庫前後,開發一個實時應用的區別。沒有數倉的時候,我們需要消費千萬級/s 的原始佇列,進行復雜的資料清洗,然後再進行使用者畫像關聯、內容維度關聯,才能拿到符合要求格式的實時資料,開發和擴充套件的成本都會比較高,如果想開發一個新的應用,又要走一遍這個流程。有了數倉之後,如果想開發內容 ID 粒度的實時應用,就直接申請 TPS 萬級/s 的 DWS 層的訊息佇列。開發成本變低很多,資源消耗小很多,可擴充套件性也強很多。

看個實際例子,開發我們系統的實時資料大屏,原本需要進行如上所有操作,才能拿到資料。現在只需要消費 DWS 層訊息佇列,寫一條 Flink SQL 即可,僅消耗 2 個 CPU 核心,1G 記憶體。

可以看到,以 50 個消費者為例,建立實時數倉前後,下游開發一個實時應用,可以減少 98%的資源消耗。包括計算資源,儲存資源,人力成本和開發人員學習接入成本等等。並且消費者越多,節省越多。就拿 Redis 儲存這一部分來說,一個月就能省下上百萬人民幣。

5) 實時儲存

介紹完實時計算,再來介紹實時儲存。

這塊分為四個部分來介紹:

  • 分散式-高可用
  • 海量資料-寫入
  • 高效能-查詢
  • 擴容

① 分散式-高可用

我們這裡聽取的是 Clickhouse 官方的建議,藉助 ZK 實現高可用的方案。資料寫入一個分片,僅寫入一個副本,然後再寫 ZK,通過 ZK 告訴同一個分片的其他副本,其他副本再過來拉取資料,保證資料一致性。

這裡沒有選用訊息佇列進行資料同步,是因為 ZK 更加輕量級。而且寫的時候,任意寫一個副本,其它副本都能夠通過 ZK 獲得一致的資料。而且就算其它節點第一次來獲取資料失敗了,後面只要發現它跟 ZK 上記錄的資料不一致,就會再次嘗試獲取資料,保證一致性。

②海量資料-寫入

資料寫入遇到的第一個問題是,海量資料直接寫入 Clickhouse 的話,會導致 ZK 的 QPS 太高,解決方案是改用 Batch 方式寫入。Batch 設定多大呢,Batch 太小的話緩解不了 ZK 的壓力,Batch 也不能太大,不然上游記憶體壓力太大,通過實驗,最終我們選用了大小几十萬的 Batch。

第二個問題是,隨著資料量的增長,單 QQ 看點的視訊內容每天可能寫入百億級的資料,預設方案是寫一張分散式表,這就會造成單臺機器出現磁碟的瓶頸,尤其是 Clickhouse 底層運用的是 Mergetree,原理類似於 HBase、RocketsDB 的底層 LSM-Tree。在合併的過程中會存在寫放大的問題,加重磁碟壓力。峰值每分鐘幾千萬條資料,寫完耗時幾十秒,如果正在做 Merge,就會阻塞寫入請求,查詢也會非常慢。我們做的兩個優化方案:一是對磁碟做 Raid,提升磁碟的 IO;二是在寫入之前進行分表,直接分開寫入到不同的分片上,磁碟壓力直接變為 1/N。

第三個問題是,雖然我們寫入按照分片進行了劃分,但是這裡引入了一個分散式系統常見的問題,就是區域性的 Top 並非全域性 Top 的問題。比如同一個內容 ID 的資料落在了不同的分片上,計算全域性 Top100 閱讀的內容 ID,有一個內容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導致彙總的時候,會丟失一部分資料,影響最終結果。我們做的優化是在寫入之前加上一層路由,將同一個內容 ID 的記錄,全部路由到同一個分片上,解決了該問題。

介紹完寫入,下一步介紹 Clickhouse 的高效能儲存和查詢。

③ 高效能-儲存-查詢

Clickhouse 高效能查詢的一個關鍵點是稀疏索引。稀疏索引這個設計就很有講究,設計得好可以加速查詢,設計不好反而會影響查詢效率。我根據我們的業務場景,因為我們的查詢大部分都是時間和內容 ID 相關的,比如說,某個內容,過去 N 分鐘在各個人群表現如何?我按照日期,分鐘粒度時間和內容 ID 建立了稀疏索引。針對某個內容的查詢,建立稀疏索引之後,可以減少 99%的檔案掃描。

還有一個問題就是,我們現在資料量太大,維度太多。拿 QQ 看點的視訊內容來說,一天流水有上百億條,有些維度有幾百個類別。如果一次性把所有維度進行預聚合,資料量會指數膨脹,查詢反而變慢,並且會佔用大量記憶體空間。我們的優化,針對不同的維度,建立對應的預聚合物化檢視,用空間換時間,這樣可以縮短查詢的時間。

分散式表查詢還會有一個問題,查詢單個內容 ID 的資訊,分散式表會將查詢下發到所有的分片上,然後再返回查詢結果進行彙總。實際上,因為做過路由,一個內容 ID 只存在於一個分片上,剩下的分片都在空跑。針對這類查詢,我們的優化是後臺按照同樣的規則先進行路由,直接查詢目標分片,這樣減少了 N-1/N 的負載,可以大量縮短查詢時間。而且由於我們是提供的 OLAP 查詢,資料滿足最終一致性即可,通過主從副本讀寫分離,可以進一步提升效能。

我們在後臺還做了一個 1 分鐘的資料快取,針對相同條件查詢,後臺就直接返回了。

④ 擴容

這裡再介紹一下我們的擴容的方案,調研了業內的一些常見方案。

比如 HBase,原始資料都存放在 HDFS 上,擴容只是 Region Server 擴容,不涉及原始資料的遷移。但是 Clickhouse 的每個分片資料都是在本地,是一個比較底層儲存引擎,不能像 HBase 那樣方便擴容。

Redis 是雜湊槽這種類似一致性雜湊的方式,是比較經典分散式快取的方案。Redis slot 在 Rehash 的過程中雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1],最後再刪除 h[0]。但是 Clickhouse 大部分都是 OLAP 批量查詢,不是點查,而且由於列式儲存,不支援刪除的特性,一致性雜湊的方案不是很適合。

目前擴容的方案是,另外消費一份資料,寫入新 Clickhouse 叢集,兩個叢集一起跑一段時間,因為實時資料就儲存 3 天,等 3 天之後,後臺服務直接訪問新叢集。

4. 有贊實時數倉案例

1) 分層設計

傳統離線數倉的分層設計大家都很熟悉,為了規範的組織和管理資料,層級劃分會比較多,在一些複雜邏輯處理場景還會引入臨時層落地中間結果以方便下游加工處理。實時數倉考慮到時效性問題,分層設計需要儘量精簡,降低中間流程出錯的可能性,不過總體而言,實時數倉還是會參考離線數倉的分層思想來設計。

實時數倉分層架構如下圖所示 :

① ODS(實時資料接入層)

ODS 層,即實時資料接入層,通過資料採集工具收集各個業務系統的實時資料,對非結構化的資料進行結構化處理,儲存原始資料,幾乎不過濾資料;該層資料的主要來源有三個部分:第一部分是業務方建立的 NSQ 訊息,第二部分是業務資料庫的 Binlog 日誌,第三部分是埋點日誌和應用程式日誌,以上三部分的實時資料最終統一寫入 Kafka 儲存介質中。

ODS 層表命名規範:部門名稱.應用名稱.數倉層級主題域字首資料庫名/訊息名

例如:接入業務庫的 Binlog

實時數倉表命名:deptname.appname.ods_subjectname_tablename

例如:接入業務方的 NSQ 訊息

實時數倉表命名:deptname.appname.ods_subjectname_msgname

② DWS(實時明細中間層)

DWS 層,即實時明細中間層,該層以業務過程作為建模驅動,基於每個具體的業務過程事件來構建最細粒度的明細層事實表;比如交易過程,有下單事件、支付事件、發貨事件等,我們會基於這些獨立的事件來進行明細層的構建。在這層,事實明細資料同樣是按照離線數倉的主題域來進行劃分,也會採用維度建模的方式組織資料,對於一些重要的維度欄位,會做適當冗餘。基於有贊實時需求的場景,重點建設交易、營銷、客戶、店鋪、商品等主題域的資料。該層的資料來源於 ODS 層,通過 FlinkSQL 進行 ETL 處理,主要工作有規範命名、資料清洗、維度補全、多流關聯,最終統一寫入 Kafka 儲存介質中。

DWS 層表命名規範:部門名稱.應用名稱.數倉層級_主題域字首_數倉表命名

例如:實時事件 A 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameA

例如:實時事件 B 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameB

③ DIM(實時維表層)

DIM 層,即實時維表層,用來存放維度資料,主要用於實時明細中間層寬化處理時補全維度使用,目前該層的資料主要儲存於 HBase 中,後續會基於 QPS 和資料量大小提供更多合適型別的儲存介質。

DIM 層表命名規範:應用名稱_數倉層級_主題域字首_數倉表命名

例如:HBase 儲存,實時維度表

實時數倉表命名:appname_dim_tablename

④ DWA(實時彙總層)

DWA 層,即實時彙總層,該層通過 DWS 層資料進行多維彙總,提供給下游業務方使用,在實際應用過程中,不同業務方使用維度彙總的方式不太一樣,根據不同的需求採用不同的技術方案去實現。第一種方式,採用 FlinkSQL 進行實時彙總,將結果指標存入 HBase、MySQL 等資料庫,該種方式是我們早期採用的方案,優點是實現業務邏輯比較靈活,缺點是聚合粒度固化,不易擴充套件;第二種方式,採用實時 OLAP 工具進行彙總,該種方式是我們目前常用的方案,優點是聚合粒度易擴充套件,缺點是業務邏輯需要在中間層預處理。

DWA 層表命名規範:應用名稱_數倉層級_主題域字首_聚合粒度_資料範圍

例如:HBase 儲存,某域當日某粒度實時彙總表

實時數倉表命名:appname_dwa_subjectname_aggname_daily

⑤ APP(實時應用層)

APP 層,即實時應用層,該層資料已經寫入應用系統的儲存中,例如寫入 Druid 作為 BI 看板的實時資料集;寫入 HBase、MySQL 用於提供統一資料服務介面;寫入 ClickHouse 用於提供實時 OLAP 服務。因為該層非常貼近業務,在命名規範上實時數倉不做統一要求。

2) 實時 ETL

實時數倉 ETL 處理過程所涉及的元件比較多,接下來盤點構建實時數倉所需要的元件以及每個元件的應用場景。如下圖所示:

具體實時 ETL 處理流程如下圖所示:

① 維度補全

建立呼叫 Duboo 介面的 UDF 函式在實時流裡補全維度是最便捷的使用方式,但如果請求量過大,對 Duboo 介面壓力會過大。在實際應用場景補全維度首選還是關聯維度表,但關聯也存在一定概率的丟失問題,為了彌補這種丟失,可以採用 Duboo 介面呼叫兜底的方式來補全。虛擬碼如下:

create function call_dubbo as 'XXXXXXX';

create function get_json_object as 'XXXXXXX';

case

when cast( b.column as bigint) is not null

then cast( b.column as bigint)

else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'

,'serviceName'

,'methodName'

,cast(concat('[',cast(a.column as varchar),']') as varchar)

,'key'

,'rootId')

as bigint)

,a.column)

as bigint) end

② 冪等處理

實時任務在執行過程中難免會遇到執行異常的情況,當任務異常重啟的時候會導致部分訊息重新發送和消費,從而引發下游實時統計資料不準確,為了有效避免這種情況,可以選擇對實時訊息流做冪等處理,當消費完一條訊息,將這條訊息的 Key 存入 KV,如果任務異常重啟導致訊息重新發送的時候,先從 KV 判斷該訊息是否已被消費,如果已消費就不再往下發送。虛擬碼如下:

create function idempotenc as 'XXXXXXX';

insert into table

select

order_no

from

select

a.orderNo as order_no

, idempotenc('XXXXXXX', coalesce( order_no, '') ) as rid

from

table1

) t

where

t.rid = 0;

③ 資料驗證

由於實時數倉的資料是無邊界的流,相比於離線數倉固定不變的資料更難驗收。基於不同的場景,我們提供了 2 種驗證方式,分別是:抽樣驗證與全量驗證。如圖 3.3 所示

  • 抽樣驗證方案

該方案主要應用在資料準確性驗證上,實時彙總結果是基於儲存在 Kafka 的實時明細中間層計算而來,但 Kafka 本身不支援按照特定條件檢索,不支援寫查詢語句,再加上訊息的無邊界性,統計結果是在不斷變化的,很難尋找參照物進行比對。鑑於此,我們採用了持久化訊息的方法,將訊息落盤到 TiDB 儲存,基於 TiDB 的能力對落盤的訊息進行檢索、查詢、彙總。編寫固定時間邊界的測試用例與相同時間邊界的業務庫資料或者離線數倉資料進行比對。通過以上方式,抽樣核心店鋪的資料進行指標準確性驗證,確保測試用例全部通過。

  • 全量驗證方案

該方案主要應用在資料完整性和一致性驗證上,在實時維度表驗證的場景使用最多。大體思路:將儲存實時維度表的線上 HBase 叢集中的資料同步到離線 HBase 叢集中,再將離線 HBase 叢集中的資料匯入到 Hive 中,在限定實時維度表的時間邊界後,通過資料平臺提供的資料校驗功能,比對實時維度表與離線維度表是否存在差異,最終確保兩張表的資料完全一致。

④ 資料恢復

實時任務一旦上線就要求持續不斷的提供準確、穩定的服務。區別於離線任務按天排程,如果離線任務出現 Bug,會有充足的時間去修復。如果實時任務出現 Bug,必須按照提前制定好的流程,嚴格按照步驟執行,否則極易出現問題。造成 Bug 的情況有非常多,比如程式碼 Bug、異常資料 Bug、實時叢集 Bug,如下圖展示了修復實時任務 Bug 並恢復資料的流程。

5. 騰訊全場景實時數倉建設案例

在數倉體系中會有各種各樣的大資料元件,譬如 Hive/HBase/HDFS/S3,計算引擎如 MapReduce、Spark、Flink,根據不同的需求,使用者會構建大資料儲存和處理平臺,資料在平臺經過處理和分析,結果資料會儲存到 MySQL、Elasticsearch 等支援快速查詢的關係型、非關係型資料庫中,接下來應用層就可以基於這些資料進行 BI 報表開發、使用者畫像,或基於 Presto 這種 OLAP 工具進行互動式查詢等。

1) Lambda 架構的痛點

在整個過程中我們常常會用一些離線的排程系統,定期的(T+1 或者每隔幾小時)去執行一些 Spark 分析任務,做一些資料的輸入、輸出或是 ETL 工作。離線資料處理的整個過程中必然存在資料延遲的現象,不管是資料接入還是中間的分析,資料的延遲都是比較大的,可能是小時級也有可能是天級別的。另外一些場景中我們也常常會為了一些實時性的需求去構建一個實時處理過程,比如藉助 Flink+Kafka 去構建實時的流處理系統。

整體上,數倉架構中有非常多的元件,大大增加了整個架構的複雜性和運維的成本。

如下圖,這是很多公司之前或者現在正在採用的 Lambda 架構,Lambda 架構將數倉分為離線層和實時層,相應的就有批處理和流處理兩個相互獨立的資料處理流程,同一份資料會被處理兩次以上,同一套業務邏輯程式碼需要適配性的開發兩次。Lambda 架構大家應該已經非常熟悉了,下面我就著重介紹一下我們採用 Lambda 架構在數倉建設過程中遇到的一些痛點問題。

例如在實時計算一些使用者相關指標的實時場景下,我們想看到當前 pv、uv 時,我們會將這些資料放到實時層去做一些計算,這些指標的值就會實時呈現出來,但同時想了解使用者的一個增長趨勢,需要把過去一天的資料計算出來。這樣就需要通過批處理的排程任務來實現,比如凌晨兩三點的時候在排程系統上起一個 Spark 排程任務把當天所有的資料重新跑一遍。

很顯然在這個過程中,由於兩個過程執行的時間是不一樣的,跑的資料卻相同,因此可能造成資料的不一致。因為某一條或幾條資料的更新,需要重新跑一遍整個離線分析的鏈路,資料更新成本很大,同時需要維護離線和實時分析兩套計算平臺,整個上下兩層的開發流程和運維成本其實都是非常高的。

為了解決 Lambda 架構帶來的各種問題,就誕生了 Kappa 架構,這個架構大家應該也非常的熟悉。

2) Kappa 架構的痛點

我們來講一下 Kappa 架構,如下圖,它中間其實用的是訊息佇列,通過用 Flink 將整個鏈路串聯起來。Kappa 架構解決了 Lambda 架構中離線處理層和實時處理層之間由於引擎不一樣,導致的運維成本和開發成本高昂的問題,但 Kappa 架構也有其痛點。

首先,在構建實時業務場景時,會用到 Kappa 去構建一個近實時的場景,但如果想對數倉中間層例如 ODS 層做一些簡單的 OLAP 分析或者進一步的資料處理時,如將資料寫到 DWD 層的 Kafka,則需要另外接入 Flink。同時,當需要從 DWD 層的 Kafka 把資料再匯入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 裡面做進一步的分析時,顯然就增加了整個架構的複雜性。

其次,Kappa 架構是強烈依賴訊息佇列的,我們知道訊息佇列本身在整個鏈路上資料計算的準確性是嚴格依賴它上游資料的順序,訊息佇列接的越多,發生亂序的可能性就越大。ODS 層資料一般是絕對準確的,把 ODS 層的資料傳送到下一個 kafka 的時候就有可能發生亂序,DWD 層再發到 DWS 的時候可能又亂序了,這樣資料不一致性就會變得很嚴重。

第三,Kafka 由於它是一個順序儲存的系統,順序儲存系統是沒有辦法直接在其上面利用 OLAP 分析的一些優化策略,例如謂詞下推這類的優化策略,在順序儲存的 Kafka 上來實現是比較困難的事情。

那麼有沒有這樣一個架構,既能夠滿足實時性的需求,又能夠滿足離線計算的要求,而且還能夠減輕運維開發的成本,解決通過訊息佇列構建 Kappa 架構過程中遇到的一些痛點?答案是肯定的,後面的篇幅會詳細論述。

3) 痛點總結

4) Flink+Iceberg 構建實時數倉

① 近實時的資料接入

前面介紹了 Iceberg 既支援讀寫分離,又支援併發讀、增量讀、小檔案合併,還可以支援秒級到分鐘級的延遲,基於這些優勢我們嘗試採用 Iceberg 這些功能來構建基於 Flink 的實時全鏈路批流一體化的實時數倉架構。

如下圖所示,Iceberg 每次的 commit 操作,都是對資料的可見性的改變,比如說讓資料從不可見變成可見,在這個過程中,就可以實現近實時的資料記錄。

② 實時數倉 - 資料湖分析系統

此前需要先進行資料接入,比如用 Spark 的離線排程任務去跑一些資料,拉取,抽取最後再寫入到 Hive 表裡面,這個過程的延時比較大。有了 Iceberg 的表結構,可以中間使用 Flink,或者 spark streaming,完成近實時的資料接入。

基於以上功能,我們再來回顧一下前面討論的 Kappa 架構,Kappa 架構的痛點上面已經描述過,Iceberg 既然能夠作為一個優秀的表格式,既支援 Streaming reader,又可以支援 Streaming sink,是否可以考慮將 Kafka 替換成 Iceberg?

Iceberg 底層依賴的儲存是像 HDFS 或 S3 這樣的廉價儲存,而且 Iceberg 是支援 parquet、orc、Avro 這樣的列式儲存。有列式儲存的支援,就可以對 OLAP 分析進行基本的優化,在中間層直接進行計算。例如謂詞下推最基本的 OLAP 優化策略,基於 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務天級別到小時級別的延遲大大的降低,改造成一個近實時的資料湖分析系統。

在中間處理層,可以用 presto 進行一些簡單的查詢,因為 Iceberg 支援 Streaming read,所以在系統的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計算的任務,把中間結果做進一步計算後輸出到下游。

總的來說,Iceberg 替換 Kafka 的優勢主要包括:

優勢:

  • 實現儲存層的流批統一;
  • 中間層支援 OLAP 分析;
  • 完美支援高效回溯;
  • 儲存成本降低。

劣勢:

  • 資料延遲從實時變成近實時;
  • 對接其他資料系統需要額外開發工作。

秒級分析 - 資料湖加速:

由於 Iceberg 本身是將資料檔案全部儲存在 HDFS 上的,HDFS 讀寫這塊對於秒級分析的場景,還是不能夠完全滿足我們的需求,所以接下去我們會在 Iceberg 底層支援 Alluxio 這樣一個快取,藉助於快取的能力可以實現資料湖的加速。這塊的架構也在我們未來的一個規劃和建設中。

作者丨五分鐘學大資料

一、實時數倉建設背景

1. 實時需求日趨迫切

目前各大公司的產品需求和內部決策對於資料實時性的要求越來越迫切,需要實時數倉的能力來賦能。傳統離線數倉的資料時效性是 T+1,排程頻率以天為單位,無法支撐實時場景的資料需求。即使能將排程頻率設定成小時,也只能解決部分時效性要求不高的場景,對於實效性要求很高的場景還是無法優雅的支撐。因此實時使用資料的問題必須得到有效解決。

2. 實時技術日趨成熟

實時計算框架已經經歷了三代發展,分別是:Storm、SparkStreaming、Flink,計算框架越來越成熟。一方面,實時任務的開發已經能通過編寫 SQL 的方式來完成,在技術層面能很好地繼承離線數倉的架構設計思想;另一方面,線上資料開發平臺所提供的功能對實時任務開發、除錯、運維的支援也日漸趨於成熟,開發成本逐步降低,有助於去做這件事。

二、實時數倉建設目的

1. 解決傳統數倉的問題

從目前數倉建設的現狀來看,實時數倉是一個容易讓人產生混淆的概念,根據傳統經驗分析,數倉有一個重要的功能,即能夠記錄歷史。通常,數倉都是希望從業務上線的第一天開始有資料,然後一直記錄到現在。但實時流處理技術,又是強調當前處理狀態的一個技術,結合當前一線大廠的建設經驗和滴滴在該領域的建設現狀,我們嘗試把公司內實時數倉建設的目的定位為,以數倉建設理論和實時技術,解決由於當前離線數倉資料時效性低解決不了的問題。

現階段我們要建設實時數倉的主要原因是:

  • 公司業務對於資料的實時性越來越迫切,需要有實時資料來輔助完成決策;
  • 實時資料建設沒有規範,資料可用性較差,無法形成數倉體系,資源大量浪費;
  • 資料平臺工具對整體實時開發的支援也日漸趨於成熟,開發成本降低。

2. 實時數倉的應用場景

  • 實時 OLAP 分析;
  • 實時資料看板;
  • 實時業務監控;
  • 實時資料介面服務。

三、實時數倉建設方案

接下來我們分析下目前實時數倉建設比較好的幾個案例,希望這些案例能夠給大家帶來一些啟發。

1. 滴滴順風車實時數倉案例

滴滴資料團隊建設的實時數倉,基本滿足了順風車業務方在實時側的各類業務需求,初步建立起順風車實時數倉,完成了整體資料分層,包含明細資料和彙總資料,統一了 DWD 層,降低了大資料資源消耗,提高了資料複用性,可對外輸出豐富的資料服務。

數倉具體架構如下圖所示:

從資料架構圖來看,順風車實時數倉和對應的離線數倉有很多類似的地方。例如分層結構;比如 ODS 層,明細層,彙總層,乃至應用層,他們命名的模式可能都是一樣的。但仔細比較不難發現,兩者有很多區別:

  • 與離線數倉相比,實時數倉的層次更少一些:
  • 從目前建設離線數倉的經驗來看,數倉的資料明細層內容會非常豐富,處理明細資料外一般還會包含輕度彙總層的概念,另外離線數倉中應用層資料在數倉內部,但實時數倉中,app 應用層資料已經落入應用系統的儲存介質中,可以把該層與數倉的表分離;
  • 應用層少建設的好處:實時處理資料的時候,每建一個層次,資料必然會產生一定的延遲;
  • 彙總層少建的好處:在彙總統計的時候,往往為了容忍一部分資料的延遲,可能會人為的製造一些延遲來保證資料的準確。舉例,在統計跨天相關的訂單事件中的資料時,可能會等到 00:00:05 或者 00:00:10 再統計,確保 00:00 前的資料已經全部接受到位了,再進行統計。所以,彙總層的層次太多的話,就會更大的加重人為造成的資料延遲。
  • 與離線數倉相比,實時數倉的資料來源儲存不同:
  • 在建設離線數倉的時候,目前滴滴內部整個離線數倉都是建立在 Hive 表之上。但是,在建設實時數倉的時候,同一份表,會使用不同的方式進行儲存。比如常見的情況下,明細資料或者彙總資料都會存在 Kafka 裡面,但是像城市、渠道等維度資訊需要藉助 Hbase,mysql 或者其他 KV 儲存等資料庫來進行儲存。

接下來,根據順風車實時數倉架構圖,對每一層建設做具體展開:

1) ODS 貼源層建設

根據順風車具體場景,目前順風車資料來源主要包括訂單相關的 binlog 日誌,冒泡和安全相關的 public 日誌,流量相關的埋點日誌等。這些資料部分已採集寫入 kafka 或 ddmq 等資料通道中,部分資料需要藉助內部自研同步工具完成採集,最終基於順風車數倉 ods 層建設規範分主題統一寫入 kafka 儲存介質中。

命名規範:ODS 層實時資料來源主要包括兩種。

一種是在離線採集時已經自動生產的 DDMQ 或者是 Kafka topic,這型別的資料命名方式為採集系統自動生成規範為:cn-binlog-資料庫名-資料庫名 eg:cn-binlog-ihap_fangyuan-ihap_fangyuan

一種是需要自己進行採集同步到 kafka topic 中,生產的 topic 命名規範同離線類似:ODS 層採用:realtime_ods_binlog_{源系統庫/表名}/ods_log_{日誌名} eg: realtime_ods_binlog_ihap_fangyuan

2)DWD 明細層建設

根據順風車業務過程作為建模驅動,基於每個具體的業務過程特點,構建最細粒度的明細層事實表;結合順風車分析師在離線側的資料使用特點,將明細事實表的某些重要維度屬性欄位做適當冗餘,完成寬表化處理,之後基於當前順風車業務方對實時資料的需求重點,重點建設交易、財務、體驗、安全、流量等幾大模組;該層的資料來源於 ODS 層,通過大資料架構提供的 Stream SQL 完成 ETL 工作,對於 binlog 日誌的處理主要進行簡單的資料清洗、處理資料漂移和資料亂序,以及可能對多個 ODS 表進行 Stream Join,對於流量日誌主要是做通用的 ETL 處理和針對順風車場景的資料過濾,完成非結構化資料的結構化處理和資料的分流;該層的資料除了儲存在訊息佇列 Kafka 中,通常也會把資料實時寫入 Druid 資料庫中,供查詢明細資料和作為簡單彙總資料的加工資料來源。

命名規範:DWD 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字元,並且應遵循下述規則:realtime_dwd_{業務/pub}_{資料域縮寫}_[{業務過程縮寫}]_[{自定義表命名標籤縮寫}]

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {自定義表命名標籤縮寫}:實體名稱可以根據資料倉庫轉換整合後做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義
  • 樣例:realtime_dwd_trip_trd_order_base

3) DIM 層

  • 公共維度層,基於維度建模理念思想,建立整個業務過程的一致性維度,降低資料計算口徑和演算法不統一風險;
  • DIM 層資料來源於兩部分:一部分是 Flink 程式實時處理 ODS 層資料得到,另外一部分是通過離線任務出倉得到;
  • DIM 層維度資料主要使用 MySQL、Hbase、fusion(滴滴自研 KV 儲存) 三種儲存引擎,對於維表資料比較少的情況可以使用 MySQL,對於單條資料大小比較小,查詢 QPS 比較高的情況,可以使用 fusion 儲存,降低機器記憶體資源佔用,對於資料量比較大,對維表資料變化不是特別敏感的場景,可以使用 HBase 儲存。

命名規範:DIM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 30 個字元,並且應遵循下述規則:dim_{業務/pub}_{維度定義}[_{自定義命名標籤}]:

  • {業務/pub}:參考業務命名
  • {維度定義}:參考維度命名
  • {自定義表命名標籤縮寫}:實體名稱可以根據資料倉庫轉換整合後做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義
  • 樣例:dim_trip_dri_base

4) DWM 彙總層建設

在建設順風車實時數倉的彙總層的時候,跟順風車離線數倉有很多一樣的地方,但其具體技術實現會存在很大不同。

第一:對於一些共性指標的加工,比如 pv,uv,訂單業務過程指標等,我們會在彙總層進行統一的運算,確保關於指標的口徑是統一在一個固定的模型中完成。對於一些個性指標,從指標複用性的角度出發,確定唯一的時間欄位,同時該欄位儘可能與其他指標在時間維度上完成拉齊,例如行中異常訂單數需要與交易域指標在事件時間上做到拉齊。

第二:在順風車彙總層建設中,需要進行多維的主題彙總,因為實時數倉本身是面向主題的,可能每個主題會關心的維度都不一樣,所以需要在不同的主題下,按照這個主題關心的維度對資料進行彙總,最後來算業務方需要的彙總指標。在具體操作中,對於 pv 類指標使用 Stream SQL 實現 1 分鐘彙總指標作為最小彙總單位指標,在此基礎上進行時間維度上的指標累加;對於 uv 類指標直接使用 druid 資料庫作為指標彙總容器,根據業務方對彙總指標的及時性和準確性的要求,實現相應的精確去重和非精確去重。

第三:彙總層建設過程中,還會涉及到衍生維度的加工。在順風車券相關的彙總指標加工中我們使用 Hbase 的版本機制來構建一個衍生維度的拉鍊表,通過事件流和 Hbase 維表關聯的方式得到實時資料當時的準確維度

命名規範:DWM 層的表命名使用英文小寫字母,單詞之間用下劃線分開,總長度不能超過 40 個字元,並且應遵循下述規則:realtime_dwm_{業務/pub}_{資料域縮寫}_{資料主粒度縮寫}_[{自定義表命名標籤縮寫}]_{統計時間週期範圍縮寫}:

  • {業務/pub}:參考業務命名
  • {資料域縮寫}:參考資料域劃分部分
  • {資料主粒度縮寫}:指資料主要粒度或資料域的縮寫,也是聯合主鍵中的主要維度
  • {自定義表命名標籤縮寫}:實體名稱可以根據資料倉庫轉換整合後做一定的業務抽象的名稱,該名稱應該準確表述實體所代表的業務含義
  • {統計時間週期範圍縮寫}:1d:天增量;td:天累計(全量);1h:小時增量;th:小時累計(全量);1min:分鐘增量;tmin:分鐘累計(全量)
  • 樣例:realtime_dwm_trip_trd_pas_bus_accum_1min

(5)APP 應用層

該層主要的工作是把實時彙總資料寫入應用系統的資料庫中,包括用於大屏顯示和實時 OLAP 的 Druid 資料庫(該資料庫除了寫入應用資料,也可以寫入明細資料完成彙總指標的計算)中,用於實時資料介面服務的 Hbase 資料庫,用於實時資料產品的 mysql 或者 redis 資料庫中。

命名規範:基於實時數倉的特殊性不做硬性要求。

2. 快手實時數倉場景化案例

1) 目標及難點

① 目標

首先由於是做數倉,因此希望所有的實時指標都有離線指標去對應,要求實時指標和離線指標整體的資料差異在 1% 以內,這是最低標準。

其次是資料延遲,其 SLA 標準是活動期間所有核心報表場景的資料延遲不能超過 5 分鐘,這 5 分鐘包括作業掛掉之後和恢復時間,如果超過則意味著 SLA 不達標。

最後是穩定性,針對一些場景,比如作業重啟後,我們的曲線是正常的,不會因為作業重啟導致指標產出一些明顯的異常。

②難點

第一個難點是資料量大。每天整體的入口流量資料量級大概在萬億級。在活動如春晚的場景,QPS 峰值能達到億 / 秒。

第二個難點是元件依賴比較複雜。可能這條鏈路裡有的依賴於 Kafka,有的依賴 Flink,還有一些依賴 KV 儲存、RPC 介面、OLAP 引擎等,我們需要思考在這條鏈路裡如何分佈,才能讓這些元件都能正常工作。

第三個難點是鏈路複雜。目前我們有 200+ 核心業務作業,50+ 核心資料來源,整體作業超過 1000。

2) 實時數倉 - 分層模型

基於上面三個難點,來看一下數倉架構:

如上所示:

最下層有三個不同的資料來源,分別是客戶端日誌、服務端日誌以及 Binlog 日誌;在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細資料,另一個是 DWS 層,做公共聚合資料,DIM 是我們常說的維度。我們有一個基於離線數倉的主題預分層,這個主題預分層可能包括流量、使用者、裝置、視訊的生產消費、風控、社交等。DWD 層的核心工作是標準化的清洗;DWS 層是把維度的資料和 DWD 層進行關聯,關聯之後生成一些通用粒度的聚合層次。再往上是應用層,包括一些大盤的資料,多維分析的模型以及業務專題資料;最上面是場景。整體過程可以分為三步:

第一步是做業務資料化,相當於把業務的資料接進來;第二步是資料資產化,意思是對資料做很多的清洗,然後形成一些規則有序的資料;第三步是資料業務化,可以理解資料在實時資料層面可以反哺業務,為業務資料價值建設提供一些賦能。

3) 實時數倉 - 保障措施

基於上面的分層模型,來看一下整體的保障措施:

保障層面分為三個不同的部分,分別是質量保障,時效保障以及穩定保障。

我們先看藍色部分的質量保障。針對質量保障,可以看到在資料來源階段,做了如資料來源的亂序監控,這是我們基於自己的 SDK 的採集做的,以及資料來源和離線的一致性校準。研發階段的計算過程有三個階段,分別是研發階段、上線階段和服務階段。研發階段可能會提供一個標準化的模型,基於這個模型會有一些 Benchmark,並且做離線的比對驗證,保證質量是一致的;上線階段更多的是服務監控和指標監控;在服務階段,如果出現一些異常情況,先做 Flink 狀態拉起,如果出現了一些不符合預期的場景,我們會做離線的整體資料修復。

第二個是時效性保障。針對資料來源,我們把資料來源的延遲情況也納入監控。在研發階段其實還有兩個事情:首先是壓測,常規的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;通過壓測之後,會有一些任務上線和重啟效能評估,相當於按照 CP 恢復之後,重啟的效能是什麼樣子。

最後一個是穩定保障。這在大型活動中會做得比較多,比如切換演練和分級保障。我們會基於之前的壓測結果做限流,目的是保障作業在超過極限的情況下,仍然是穩定的,不會出現很多的不穩定或者 CP 失敗的情況。之後我們會有兩種不同的標準,一種是冷備雙機房,另外一種是熱備雙機房。冷備雙機房是:當一個單機房掛掉,我們會從另一個機房去拉起;熱備雙機房:相當於同樣一份邏輯在兩個機房各部署一次。以上就是我們整體的保障措施。

4)快手場景問題及解決方案

① PV/UV 標準化

  •  場景

第一個問題是 PV/UV 標準化,這裡有三個截圖:

第一張圖是春晚活動的預熱場景,相當於是一種玩法,第二和第三張圖是春晚當天的發紅包活動和直播間截圖。

在活動進行過程中,我們發現 60~70% 的需求是計算頁面裡的資訊,如:

這個頁面來了多少人,或者有多少人點選進入這個頁面;

活動一共來了多少人;

頁面裡的某一個掛件,獲得了多少點選、產生了多少曝光。

  • 方案

抽象一下這個場景就是下面這種 SQL:

簡單來說,就是從一張表做篩選條件,然後按照維度層面做聚合,接著產生一些 Count 或者 Sum 操作。

基於這種場景,我們最開始的解決方案如上圖右邊所示。

我們用到了 Flink SQL 的 Early Fire 機制,從 Source 資料來源取資料,之後做了 DID 的分桶。比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題。分桶之後會有一個叫做 Local Window Agg 的東西,相當於資料分完桶之後把相同型別的資料相加。Local Window Agg 之後再按照維度進行 Global Window Agg 的合桶,合桶的概念相當於按照維度計算出最終的結果。Early Fire 機制相當於在 Local Window Agg 開一個天級的視窗,然後每分鐘去對外輸出一次。

這個過程中我們遇到了一些問題,如上圖左下角所示。

在程式碼正常執行的情況下是沒有問題的,但如果整體資料存在延遲或者追溯歷史資料的情況,比如一分鐘 Early Fire 一次,因為追溯歷史的時候資料量會比較大,所以可能導致 14:00 追溯歷史,直接讀到了 14:02 的資料,而 14:01 的那個點就被丟掉了,丟掉了以後會發生什麼?

在這種場景下,圖中上方的曲線為 Early Fire 回溯歷史資料的結果。橫座標是分鐘,縱座標是截止到當前時刻的頁面 UV,我們發現有些點是橫著的,意味著沒有資料結果,然後一個陡增,然後又橫著的,接著又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線。

為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本里也有涉及,其原理是一樣的。

資料開一個大的天級視窗,大視窗下又開了一個小的分鐘級視窗,資料按資料本身的 Row Time 落到分鐘級視窗。

Watermark 推進過了視窗的 event_time,它會進行一次下發的觸發,通過這種方式可以解決回溯的問題,資料本身落在真實的視窗, Watermark 推進,在視窗結束後觸發。此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序資料本身是一個不丟棄的狀態,會記錄到最新的累計資料。最後是語義一致性,它會基於事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一致性是相當高的。以上是 PV/UV 一個標準化的解決方案。

② DAU 計算

  • 背景介紹

下面介紹一下 DAU 計算:

我們對於整個大盤的活躍裝置、新增裝置和迴流裝置有比較多的監控。

活躍裝置指的是當天來過的裝置;新增裝置指的是當天來過且歷史沒有來過的裝置;迴流裝置指的是當天來過且 N 天內沒有來過的裝置。但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個指標。

我們看一下離線過程中,邏輯應該怎麼算。

首先我們先算活躍裝置,把這些合併到一起,然後做一個維度下的天級別去重,接著再去關聯維度表,這個維度表包括裝置的首末次時間,就是截止到昨天裝置首次訪問和末次訪問的時間。

得到這個資訊之後,我們就可以進行邏輯計算,然後我們會發現新增和迴流的裝置其實是活躍裝置裡打的一個子標籤。新增裝置就是做了一個邏輯處理,迴流裝置是做了 30 天的邏輯處理,基於這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?

其實我們最開始是這麼做的,但遇到了一些問題:

第一個問題是:資料來源是 6~8 個,而且我們大盤的口徑經常會做微調,如果是單作業的話,每次微調的過程之中都要改,單作業的穩定性會非常差;第二個問題是:資料量是萬億級,這會導致兩個情況,首先是這個量級的單作業穩定性非常差,其次是實時關聯維表的時候用的 KV 儲存,任何一個這樣的 RPC 服務介面,都不可能在萬億級資料量的場景下保證服務穩定性;第三個問題是:我們對於時延要求比較高,要求時延小於一分鐘。整個鏈路要避免批處理,如果出現了一些任務效能的單點問題,我們還要保證高效能和可擴容。

  • 技術方案

針對以上問題,介紹一下我們是怎麼做的:

如上圖的例子,第一步是對 A B C 這三個資料來源,先按照維度和 DID 做分鐘級別去重,分別去重之後得到三個分鐘級別去重的資料來源,接著把它們 Union 到一起,然後再進行同樣的邏輯操作。

這相當於我們資料來源的入口從萬億變到了百億的級別,分鐘級別去重之後再進行一個天級別的去重,產生的資料來源就可以從百億變成了幾十億的級別。

在幾十億級別資料量的情況下,我們再去關聯資料服務化,這就是一種比較可行的狀態,相當於去關聯使用者畫像的 RPC 介面,得到 RPC 介面之後,最終寫入到了目標 Topic。這個目標 Topic 會匯入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,指標看板服務等。

這個方案有三個方面的優勢,分別是穩定性、時效性和準確性。

首先是穩定性。鬆耦合可以簡單理解為當資料來源 A 的邏輯和資料來源 B 的邏輯需要修改時,可以單獨修改。第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現瞭如流量問題,不會影響後面的部分,所以它擴容比較簡單,除此之外還有服務化後置和狀態可控。其次是時效性,我們做到毫秒延遲,並且維度豐富,整體上有 20+ 的維度做多維聚合。最後是準確性,我們支援資料驗證、實時監控、模型出口統一等。此時我們遇到了另外一個問題 - 亂序。對於上方三個不同的作業,每一個作業重啟至少會有兩分鐘左右的延遲,延遲會導致下游的資料來源 Union 到一起就會有亂序。

  • 延遲計算方案

遇到上面這種有亂序的情況下,我們要怎麼處理?

我們總共有三種處理方案:

第一種解決方案是用 “did + 維度 + 分鐘” 進行去重,Value 設為 “是否來過”。比如同一個 did,04:01 來了一條,它會進行結果輸出。同樣的,04:02 和 04:04 也會進行結果輸出。但如果 04:01 再來,它就會丟棄,但如果 04:00 來,依舊會進行結果輸出。

這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀態大小是存 10 分鐘的兩倍,到後面這個狀態大小有點不太可控,因此我們又換了解決方案 2。

第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在資料來源亂序的情況。在這種情況下,key 存的是 “did + 維度”,Value 為 “時間戳”,它的更新方式如上圖所示。04:01 來了一條資料,進行結果輸出。04:02 來了一條資料,如果是同一個 did,那麼它會更新時間戳,然後仍然做結果輸出。04:04 也是同樣的邏輯,然後將時間戳更新到 04:04,如果後面來了一條 04:01 的資料,它發現時間戳已經更新到 04:04,它會丟棄這條資料。這樣的做法大幅度減少了本身所需要的一些狀態,但是對亂序是零容忍,不允許發生任何亂序的情況,由於我們不好解決這個問題,因此我們又想出瞭解決方案 3。

方案 3 是在方案 2 時間戳的基礎之上,加了一個類似於環形緩衝區,在緩衝區之內允許亂序。

比如 04:01 來了一條資料,進行結果輸出;04:02 來了一條資料,它會把時間戳更新到 04:02,並且會記錄同一個裝置在 04:01 也來過。如果 04:04 再來了一條資料,就按照相應的時間差做一個位移,最後通過這樣的邏輯去保障它能夠容忍一定的亂序。

綜合來看這三個方案:

方案 1 在容忍 16 分鐘亂序的情況下,單作業的狀態大小在 480G 左右。這種情況雖然保證了準確性,但是作業的恢復和穩定性是完全不可控的狀態,因此我們還是放棄了這個方案;

方案 2 是 30G 左右的狀態大小,對於亂序 0 容忍,但是資料不準確,由於我們對準確性的要求非常高,因此也放棄了這個方案;

方案 3 的狀態跟方案 1 相比,它的狀態雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果。方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業的話,10 分鐘完全足夠重啟,因此最終選擇了方案 3。

③ 運營場景

  •  背景介紹

運營場景可分為四個部分:

第一個是資料大屏支援,包括單直播間的分析資料和大盤的分析資料,需要做到分鐘級延遲,更新要求比較高;

第二個是直播看板支援,直播看板的資料會有特定維度的分析,特定人群支援,對維度豐富性要求比較高;

第三個是資料策略榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級別的資料,更新要求比較低;

第四個是 C 端實時指標展示,查詢量比較大,但是查詢模式比較固定。

下面進行分析這 4 種不同的狀態產生的一些不同的場景。

前 3 種基本沒有什麼差別,只是在查詢模式上,有的是特定業務場景,有的是通用業務場景。

針對第 3 種和第 4 種,它對於更新的要求比較低,對於吞吐的要求比較高,過程之中的曲線也不要求有一致性。第 4 種查詢模式更多的是單實體的一些查詢,比如去查詢內容,會有哪些指標,而且對 QPS 要求比較高。

  •  技術方案

針對上方 4 種不同的場景,我們是如何去做的?

首先看一下基礎明細層 (圖中左方),資料來源有兩條鏈路,其中一條鏈路是消費的流,比如直播的消費資訊,還有觀看 / 點贊 / 評論。經過一輪基礎清洗,然後做維度管理。上游的這些維度資訊來源於 Kafka,Kafka 寫入了一些內容的維度,放到了 KV 儲存裡邊,包括一些使用者的維度。

這些維度關聯了之後,最終寫入 Kafka 的 DWD 事實層,這裡為了做效能的提升,我們做了二級快取的操作。

如圖中上方,我們讀取 DWD 層的資料然後做基礎彙總,核心是視窗維度聚合生成 4 種不同粒度的資料,分別是大盤多維彙總 topic、直播間多維彙總 topic、作者多維彙總 topic、使用者多維彙總 topic,這些都是通用維度的資料。

如圖中下方,基於這些通用維度資料,我們再去加工個性化維度的資料,也就是 ADS 層。拿到了這些資料之後會有維度擴充套件,包括內容擴充套件和運營維度的拓展,然後再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic。

分成這樣的兩個鏈路會有一個好處:一個地方處理的是通用維度,另一個地方處理的是個性化的維度。通用維度保障的要求會比較高一些,個性化維度則會做很多個性化的邏輯。如果這兩個耦合在一起的話,會發現任務經常出問題,並且分不清楚哪個任務的職責是什麼,構建不出這樣的一個穩定層。

如圖中右方,最終我們用到了三種不同的引擎。簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業務看板的場景。

3. 騰訊看點實時數倉案例

騰訊看點業務為什麼要構建實時數倉,因為原始的上報資料量非常大,一天上報峰值就有上萬億條。而且上報格式混亂。缺乏內容維度資訊、使用者畫像資訊,下游沒辦法直接使用。而我們提供的實時數倉,是根據騰訊看點資訊流的業務場景,進行了內容維度的關聯,使用者畫像的關聯,各種粒度的聚合,下游可以非常方便的使用實時資料。

1) 方案選型

那就看下我們多維實時資料分析系統的方案選型,選型我們對比了行業內的領先方案,選擇了最符合我們業務場景的方案。

第一塊是實時數倉的選型,我們選擇的是業界比較成熟的 Lambda 架構,他的優點是靈活性高、容錯性高、成熟度高和遷移成本低;缺點是實時、離線資料用兩套程式碼,可能會存在一個口徑修改了,另一個沒改的問題,我們每天都有做資料對賬的工作,如果有異常會進行告警。

第二塊是實時計算引擎選型,因為 Flink 設計之初就是為了流處理,SparkStreaming 嚴格來說還是微批處理,Strom 用的已經不多了。再看 Flink 具有 Exactly-once 的準確性、輕量級 Checkpoint 容錯機制、低延時高吞吐和易用性高的特點,我們選擇了 Flink 作為實時計算引擎。

第三塊是實時儲存引擎,我們的要求就是需要有維度索引、支援高併發、預聚合、高效能實時多維 OLAP 查詢。可以看到,Hbase、Tdsql 和 ES 都不能滿足要求,Druid 有一個缺陷,它是按照時序劃分 Segment,無法將同一個內容,存放在同一個 Segment 上,計算全域性 TopN 只能是近似值,所以我們選擇了最近兩年大火的 MPP 資料庫引擎 ClickHouse。

2) 設計目標與設計難點

我們多維實時資料分析系統分為三大模組:

  • 實時計算引擎
  • 實時儲存引擎
  • App 層

難點主要在前兩個模組:實時計算引擎和實時儲存引擎。

  • 千萬級/s 的海量資料如何實時接入,並且進行極低延遲維表關聯。
  • 實時儲存引擎如何支援高併發寫入、高可用分散式和高效能索引查詢,是比較難的。

這幾個模組的具體實現,看一下我們系統的架構設計。

3) 架構設計

前端採用的是開源元件 Ant Design,利用了 Nginx 伺服器,部署靜態頁面,並反向代理了瀏覽器的請求到後臺伺服器上。

後臺服務是基於騰訊自研的 RPC 後臺服務框架寫的,並且會進行一些二級快取。

實時數倉部分,分為了接入層、實時計算層和實時數倉儲存層。

  • 接入層主要是從千萬級/s 的原始訊息佇列中,拆分出不同行為資料的微佇列,拿看點的視訊來說,拆分過後,資料就只有百萬級/s 了;
  • 實時計算層主要負責,多行行為流水資料進行行轉列,實時關聯使用者畫像資料和內容維度資料;
  • 實時數倉儲存層主要是設計出符合看點業務的,下游好用的實時訊息佇列。我們暫時提供了兩個訊息佇列,作為實時數倉的兩層。一層 DWM 層是內容 ID-使用者 ID 粒度聚合的,就是一條資料包含內容 ID-使用者 ID 還有 B 側內容資料、C 側使用者資料和使用者畫像資料;另一層是 DWS 層,是內容 ID 粒度聚合的,一條資料包含內容 ID,B 側資料和 C 側資料。可以看到內容 ID-使用者 ID 粒度的訊息佇列流量進一步減小到十萬級/s,內容 ID 粒度的更是萬級/s,並且格式更加清晰,維度資訊更加豐富。

實時儲存部分分為實時寫入層、OLAP 儲存層和後臺介面層。

  • 實時寫入層主要是負責 Hash 路由將資料寫入;
  • OLAP 儲存層利用 MPP 儲存引擎,設計符合業務的索引和物化檢視,高效儲存海量資料;
  • 後臺介面層提供高效的多維實時查詢介面。

4) 實時計算

這個系統最複雜的兩塊,實時計算和實時儲存。

先介紹實時計算部分:分為實時關聯和實時數倉。

① 實時高效能維表關聯

實時維表關聯這一塊難度在於 百萬級/s 的實時資料流,如果直接去關聯 HBase,1 分鐘的資料,關聯完 HBase 耗時是小時級的,會導致資料延遲嚴重。

我們提出了幾個解決方案:

  • 第一,在 Flink 實時計算環節,先按照 1 分鐘進行了視窗聚合,將視窗內多行行為資料轉一行多列的資料格式,經過這一步操作,原本小時級的關聯耗時下降到了十幾分鍾,但是還是不夠的。
  • 第二,在訪問 HBase 內容之前設定一層 Redis 快取,因為 1000 條資料訪問 HBase 是秒級的,而訪問 Redis 是毫秒級的,訪問 Redis 的速度基本是訪問 HBase 的 1000 倍。為了防止過期的資料浪費快取,快取過期時間設定成 24 小時,同時通過監聽寫 HBase Proxy 來保證快取的一致性。這樣將訪問時間從十幾分鍾變成了秒級。
  • 第三,上報過程中會上報不少非常規內容 ID,這些內容 ID 在內容 HBase 中是不儲存的,會造成快取穿透的問題。所以在實時計算的時候,我們直接過濾掉這些內容 ID,防止快取穿透,又減少一些時間。
  • 第四,因為設定了定時快取,會引入一個快取雪崩的問題。為了防止雪崩,我們在實時計算中,進行了削峰填谷的操作,錯開設定快取的時間。

可以看到,優化前後,資料量從百億級減少到了十億級,耗時從小時級減少到了數十秒,減少 99%。

②下游提供服務

實時數倉的難度在於:它處於比較新的領域,並且各個公司各個業務差距比較大,怎麼能設計出方便,好用,符合看點業務場景的實時數倉是有難度的。

先看一下實時數倉做了什麼,實時數倉對外就是幾個訊息佇列,不同的訊息佇列裡面存放的就是不同聚合粒度的實時資料,包括內容 ID、使用者 ID、C 側行為資料、B 側內容維度資料和使用者畫像資料等。

我們是怎麼搭建實時數倉的,就是上面介紹的實時計算引擎的輸出,放到訊息佇列中儲存,可以提供給下游多使用者複用。

我們可以看下,在我們建設實時資料倉庫前後,開發一個實時應用的區別。沒有數倉的時候,我們需要消費千萬級/s 的原始佇列,進行復雜的資料清洗,然後再進行使用者畫像關聯、內容維度關聯,才能拿到符合要求格式的實時資料,開發和擴充套件的成本都會比較高,如果想開發一個新的應用,又要走一遍這個流程。有了數倉之後,如果想開發內容 ID 粒度的實時應用,就直接申請 TPS 萬級/s 的 DWS 層的訊息佇列。開發成本變低很多,資源消耗小很多,可擴充套件性也強很多。

看個實際例子,開發我們系統的實時資料大屏,原本需要進行如上所有操作,才能拿到資料。現在只需要消費 DWS 層訊息佇列,寫一條 Flink SQL 即可,僅消耗 2 個 CPU 核心,1G 記憶體。

可以看到,以 50 個消費者為例,建立實時數倉前後,下游開發一個實時應用,可以減少 98%的資源消耗。包括計算資源,儲存資源,人力成本和開發人員學習接入成本等等。並且消費者越多,節省越多。就拿 Redis 儲存這一部分來說,一個月就能省下上百萬人民幣。

5) 實時儲存

介紹完實時計算,再來介紹實時儲存。

這塊分為四個部分來介紹:

  • 分散式-高可用
  • 海量資料-寫入
  • 高效能-查詢
  • 擴容

① 分散式-高可用

我們這裡聽取的是 Clickhouse 官方的建議,藉助 ZK 實現高可用的方案。資料寫入一個分片,僅寫入一個副本,然後再寫 ZK,通過 ZK 告訴同一個分片的其他副本,其他副本再過來拉取資料,保證資料一致性。

這裡沒有選用訊息佇列進行資料同步,是因為 ZK 更加輕量級。而且寫的時候,任意寫一個副本,其它副本都能夠通過 ZK 獲得一致的資料。而且就算其它節點第一次來獲取資料失敗了,後面只要發現它跟 ZK 上記錄的資料不一致,就會再次嘗試獲取資料,保證一致性。

②海量資料-寫入

資料寫入遇到的第一個問題是,海量資料直接寫入 Clickhouse 的話,會導致 ZK 的 QPS 太高,解決方案是改用 Batch 方式寫入。Batch 設定多大呢,Batch 太小的話緩解不了 ZK 的壓力,Batch 也不能太大,不然上游記憶體壓力太大,通過實驗,最終我們選用了大小几十萬的 Batch。

第二個問題是,隨著資料量的增長,單 QQ 看點的視訊內容每天可能寫入百億級的資料,預設方案是寫一張分散式表,這就會造成單臺機器出現磁碟的瓶頸,尤其是 Clickhouse 底層運用的是 Mergetree,原理類似於 HBase、RocketsDB 的底層 LSM-Tree。在合併的過程中會存在寫放大的問題,加重磁碟壓力。峰值每分鐘幾千萬條資料,寫完耗時幾十秒,如果正在做 Merge,就會阻塞寫入請求,查詢也會非常慢。我們做的兩個優化方案:一是對磁碟做 Raid,提升磁碟的 IO;二是在寫入之前進行分表,直接分開寫入到不同的分片上,磁碟壓力直接變為 1/N。

第三個問題是,雖然我們寫入按照分片進行了劃分,但是這裡引入了一個分散式系統常見的問題,就是區域性的 Top 並非全域性 Top 的問題。比如同一個內容 ID 的資料落在了不同的分片上,計算全域性 Top100 閱讀的內容 ID,有一個內容 ID 在分片 1 上是 Top100,但是在其它分片上不是 Top100,導致彙總的時候,會丟失一部分資料,影響最終結果。我們做的優化是在寫入之前加上一層路由,將同一個內容 ID 的記錄,全部路由到同一個分片上,解決了該問題。

介紹完寫入,下一步介紹 Clickhouse 的高效能儲存和查詢。

③ 高效能-儲存-查詢

Clickhouse 高效能查詢的一個關鍵點是稀疏索引。稀疏索引這個設計就很有講究,設計得好可以加速查詢,設計不好反而會影響查詢效率。我根據我們的業務場景,因為我們的查詢大部分都是時間和內容 ID 相關的,比如說,某個內容,過去 N 分鐘在各個人群表現如何?我按照日期,分鐘粒度時間和內容 ID 建立了稀疏索引。針對某個內容的查詢,建立稀疏索引之後,可以減少 99%的檔案掃描。

還有一個問題就是,我們現在資料量太大,維度太多。拿 QQ 看點的視訊內容來說,一天流水有上百億條,有些維度有幾百個類別。如果一次性把所有維度進行預聚合,資料量會指數膨脹,查詢反而變慢,並且會佔用大量記憶體空間。我們的優化,針對不同的維度,建立對應的預聚合物化檢視,用空間換時間,這樣可以縮短查詢的時間。

分散式表查詢還會有一個問題,查詢單個內容 ID 的資訊,分散式表會將查詢下發到所有的分片上,然後再返回查詢結果進行彙總。實際上,因為做過路由,一個內容 ID 只存在於一個分片上,剩下的分片都在空跑。針對這類查詢,我們的優化是後臺按照同樣的規則先進行路由,直接查詢目標分片,這樣減少了 N-1/N 的負載,可以大量縮短查詢時間。而且由於我們是提供的 OLAP 查詢,資料滿足最終一致性即可,通過主從副本讀寫分離,可以進一步提升效能。

我們在後臺還做了一個 1 分鐘的資料快取,針對相同條件查詢,後臺就直接返回了。

④ 擴容

這裡再介紹一下我們的擴容的方案,調研了業內的一些常見方案。

比如 HBase,原始資料都存放在 HDFS 上,擴容只是 Region Server 擴容,不涉及原始資料的遷移。但是 Clickhouse 的每個分片資料都是在本地,是一個比較底層儲存引擎,不能像 HBase 那樣方便擴容。

Redis 是雜湊槽這種類似一致性雜湊的方式,是比較經典分散式快取的方案。Redis slot 在 Rehash 的過程中雖然存在短暫的 ask 讀不可用,但是總體來說遷移是比較方便的,從原 h[0]遷移到 h[1],最後再刪除 h[0]。但是 Clickhouse 大部分都是 OLAP 批量查詢,不是點查,而且由於列式儲存,不支援刪除的特性,一致性雜湊的方案不是很適合。

目前擴容的方案是,另外消費一份資料,寫入新 Clickhouse 叢集,兩個叢集一起跑一段時間,因為實時資料就儲存 3 天,等 3 天之後,後臺服務直接訪問新叢集。

4. 有贊實時數倉案例

1) 分層設計

傳統離線數倉的分層設計大家都很熟悉,為了規範的組織和管理資料,層級劃分會比較多,在一些複雜邏輯處理場景還會引入臨時層落地中間結果以方便下游加工處理。實時數倉考慮到時效性問題,分層設計需要儘量精簡,降低中間流程出錯的可能性,不過總體而言,實時數倉還是會參考離線數倉的分層思想來設計。

實時數倉分層架構如下圖所示 :

① ODS(實時資料接入層)

ODS 層,即實時資料接入層,通過資料採集工具收集各個業務系統的實時資料,對非結構化的資料進行結構化處理,儲存原始資料,幾乎不過濾資料;該層資料的主要來源有三個部分:第一部分是業務方建立的 NSQ 訊息,第二部分是業務資料庫的 Binlog 日誌,第三部分是埋點日誌和應用程式日誌,以上三部分的實時資料最終統一寫入 Kafka 儲存介質中。

ODS 層表命名規範:部門名稱.應用名稱.數倉層級主題域字首資料庫名/訊息名

例如:接入業務庫的 Binlog

實時數倉表命名:deptname.appname.ods_subjectname_tablename

例如:接入業務方的 NSQ 訊息

實時數倉表命名:deptname.appname.ods_subjectname_msgname

② DWS(實時明細中間層)

DWS 層,即實時明細中間層,該層以業務過程作為建模驅動,基於每個具體的業務過程事件來構建最細粒度的明細層事實表;比如交易過程,有下單事件、支付事件、發貨事件等,我們會基於這些獨立的事件來進行明細層的構建。在這層,事實明細資料同樣是按照離線數倉的主題域來進行劃分,也會採用維度建模的方式組織資料,對於一些重要的維度欄位,會做適當冗餘。基於有贊實時需求的場景,重點建設交易、營銷、客戶、店鋪、商品等主題域的資料。該層的資料來源於 ODS 層,通過 FlinkSQL 進行 ETL 處理,主要工作有規範命名、資料清洗、維度補全、多流關聯,最終統一寫入 Kafka 儲存介質中。

DWS 層表命名規範:部門名稱.應用名稱.數倉層級_主題域字首_數倉表命名

例如:實時事件 A 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameA

例如:實時事件 B 的中間層

實時數倉表命名:deptname.appname.dws_subjectname_tablename_eventnameB

③ DIM(實時維表層)

DIM 層,即實時維表層,用來存放維度資料,主要用於實時明細中間層寬化處理時補全維度使用,目前該層的資料主要儲存於 HBase 中,後續會基於 QPS 和資料量大小提供更多合適型別的儲存介質。

DIM 層表命名規範:應用名稱_數倉層級_主題域字首_數倉表命名

例如:HBase 儲存,實時維度表

實時數倉表命名:appname_dim_tablename

④ DWA(實時彙總層)

DWA 層,即實時彙總層,該層通過 DWS 層資料進行多維彙總,提供給下游業務方使用,在實際應用過程中,不同業務方使用維度彙總的方式不太一樣,根據不同的需求採用不同的技術方案去實現。第一種方式,採用 FlinkSQL 進行實時彙總,將結果指標存入 HBase、MySQL 等資料庫,該種方式是我們早期採用的方案,優點是實現業務邏輯比較靈活,缺點是聚合粒度固化,不易擴充套件;第二種方式,採用實時 OLAP 工具進行彙總,該種方式是我們目前常用的方案,優點是聚合粒度易擴充套件,缺點是業務邏輯需要在中間層預處理。

DWA 層表命名規範:應用名稱_數倉層級_主題域字首_聚合粒度_資料範圍

例如:HBase 儲存,某域當日某粒度實時彙總表

實時數倉表命名:appname_dwa_subjectname_aggname_daily

⑤ APP(實時應用層)

APP 層,即實時應用層,該層資料已經寫入應用系統的儲存中,例如寫入 Druid 作為 BI 看板的實時資料集;寫入 HBase、MySQL 用於提供統一資料服務介面;寫入 ClickHouse 用於提供實時 OLAP 服務。因為該層非常貼近業務,在命名規範上實時數倉不做統一要求。

2) 實時 ETL

實時數倉 ETL 處理過程所涉及的元件比較多,接下來盤點構建實時數倉所需要的元件以及每個元件的應用場景。如下圖所示:

具體實時 ETL 處理流程如下圖所示:

① 維度補全

建立呼叫 Duboo 介面的 UDF 函式在實時流裡補全維度是最便捷的使用方式,但如果請求量過大,對 Duboo 介面壓力會過大。在實際應用場景補全維度首選還是關聯維度表,但關聯也存在一定概率的丟失問題,為了彌補這種丟失,可以採用 Duboo 介面呼叫兜底的方式來補全。虛擬碼如下:

create function call_dubbo as 'XXXXXXX';

create function get_json_object as 'XXXXXXX';

case

when cast( b.column as bigint) is not null

then cast( b.column as bigint)

else cast(coalesce(cast(get_json_object(call_dubbo('clusterUrl'

,'serviceName'

,'methodName'

,cast(concat('[',cast(a.column as varchar),']') as varchar)

,'key'

,'rootId')

as bigint)

,a.column)

as bigint) end

② 冪等處理

實時任務在執行過程中難免會遇到執行異常的情況,當任務異常重啟的時候會導致部分訊息重新發送和消費,從而引發下游實時統計資料不準確,為了有效避免這種情況,可以選擇對實時訊息流做冪等處理,當消費完一條訊息,將這條訊息的 Key 存入 KV,如果任務異常重啟導致訊息重新發送的時候,先從 KV 判斷該訊息是否已被消費,如果已消費就不再往下發送。虛擬碼如下:

create function idempotenc as 'XXXXXXX';

insert into table

select

order_no

from

select

a.orderNo as order_no

, idempotenc('XXXXXXX', coalesce( order_no, '') ) as rid

from

table1

) t

where

t.rid = 0;

③ 資料驗證

由於實時數倉的資料是無邊界的流,相比於離線數倉固定不變的資料更難驗收。基於不同的場景,我們提供了 2 種驗證方式,分別是:抽樣驗證與全量驗證。如圖 3.3 所示

  • 抽樣驗證方案

該方案主要應用在資料準確性驗證上,實時彙總結果是基於儲存在 Kafka 的實時明細中間層計算而來,但 Kafka 本身不支援按照特定條件檢索,不支援寫查詢語句,再加上訊息的無邊界性,統計結果是在不斷變化的,很難尋找參照物進行比對。鑑於此,我們採用了持久化訊息的方法,將訊息落盤到 TiDB 儲存,基於 TiDB 的能力對落盤的訊息進行檢索、查詢、彙總。編寫固定時間邊界的測試用例與相同時間邊界的業務庫資料或者離線數倉資料進行比對。通過以上方式,抽樣核心店鋪的資料進行指標準確性驗證,確保測試用例全部通過。

  • 全量驗證方案

該方案主要應用在資料完整性和一致性驗證上,在實時維度表驗證的場景使用最多。大體思路:將儲存實時維度表的線上 HBase 叢集中的資料同步到離線 HBase 叢集中,再將離線 HBase 叢集中的資料匯入到 Hive 中,在限定實時維度表的時間邊界後,通過資料平臺提供的資料校驗功能,比對實時維度表與離線維度表是否存在差異,最終確保兩張表的資料完全一致。

④ 資料恢復

實時任務一旦上線就要求持續不斷的提供準確、穩定的服務。區別於離線任務按天排程,如果離線任務出現 Bug,會有充足的時間去修復。如果實時任務出現 Bug,必須按照提前制定好的流程,嚴格按照步驟執行,否則極易出現問題。造成 Bug 的情況有非常多,比如程式碼 Bug、異常資料 Bug、實時叢集 Bug,如下圖展示了修復實時任務 Bug 並恢復資料的流程。

5. 騰訊全場景實時數倉建設案例

在數倉體系中會有各種各樣的大資料元件,譬如 Hive/HBase/HDFS/S3,計算引擎如 MapReduce、Spark、Flink,根據不同的需求,使用者會構建大資料儲存和處理平臺,資料在平臺經過處理和分析,結果資料會儲存到 MySQL、Elasticsearch 等支援快速查詢的關係型、非關係型資料庫中,接下來應用層就可以基於這些資料進行 BI 報表開發、使用者畫像,或基於 Presto 這種 OLAP 工具進行互動式查詢等。

1) Lambda 架構的痛點

在整個過程中我們常常會用一些離線的排程系統,定期的(T+1 或者每隔幾小時)去執行一些 Spark 分析任務,做一些資料的輸入、輸出或是 ETL 工作。離線資料處理的整個過程中必然存在資料延遲的現象,不管是資料接入還是中間的分析,資料的延遲都是比較大的,可能是小時級也有可能是天級別的。另外一些場景中我們也常常會為了一些實時性的需求去構建一個實時處理過程,比如藉助 Flink+Kafka 去構建實時的流處理系統。

整體上,數倉架構中有非常多的元件,大大增加了整個架構的複雜性和運維的成本。

如下圖,這是很多公司之前或者現在正在採用的 Lambda 架構,Lambda 架構將數倉分為離線層和實時層,相應的就有批處理和流處理兩個相互獨立的資料處理流程,同一份資料會被處理兩次以上,同一套業務邏輯程式碼需要適配性的開發兩次。Lambda 架構大家應該已經非常熟悉了,下面我就著重介紹一下我們採用 Lambda 架構在數倉建設過程中遇到的一些痛點問題。

例如在實時計算一些使用者相關指標的實時場景下,我們想看到當前 pv、uv 時,我們會將這些資料放到實時層去做一些計算,這些指標的值就會實時呈現出來,但同時想了解使用者的一個增長趨勢,需要把過去一天的資料計算出來。這樣就需要通過批處理的排程任務來實現,比如凌晨兩三點的時候在排程系統上起一個 Spark 排程任務把當天所有的資料重新跑一遍。

很顯然在這個過程中,由於兩個過程執行的時間是不一樣的,跑的資料卻相同,因此可能造成資料的不一致。因為某一條或幾條資料的更新,需要重新跑一遍整個離線分析的鏈路,資料更新成本很大,同時需要維護離線和實時分析兩套計算平臺,整個上下兩層的開發流程和運維成本其實都是非常高的。

為了解決 Lambda 架構帶來的各種問題,就誕生了 Kappa 架構,這個架構大家應該也非常的熟悉。

2) Kappa 架構的痛點

我們來講一下 Kappa 架構,如下圖,它中間其實用的是訊息佇列,通過用 Flink 將整個鏈路串聯起來。Kappa 架構解決了 Lambda 架構中離線處理層和實時處理層之間由於引擎不一樣,導致的運維成本和開發成本高昂的問題,但 Kappa 架構也有其痛點。

首先,在構建實時業務場景時,會用到 Kappa 去構建一個近實時的場景,但如果想對數倉中間層例如 ODS 層做一些簡單的 OLAP 分析或者進一步的資料處理時,如將資料寫到 DWD 層的 Kafka,則需要另外接入 Flink。同時,當需要從 DWD 層的 Kafka 把資料再匯入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 裡面做進一步的分析時,顯然就增加了整個架構的複雜性。

其次,Kappa 架構是強烈依賴訊息佇列的,我們知道訊息佇列本身在整個鏈路上資料計算的準確性是嚴格依賴它上游資料的順序,訊息佇列接的越多,發生亂序的可能性就越大。ODS 層資料一般是絕對準確的,把 ODS 層的資料傳送到下一個 kafka 的時候就有可能發生亂序,DWD 層再發到 DWS 的時候可能又亂序了,這樣資料不一致性就會變得很嚴重。

第三,Kafka 由於它是一個順序儲存的系統,順序儲存系統是沒有辦法直接在其上面利用 OLAP 分析的一些優化策略,例如謂詞下推這類的優化策略,在順序儲存的 Kafka 上來實現是比較困難的事情。

那麼有沒有這樣一個架構,既能夠滿足實時性的需求,又能夠滿足離線計算的要求,而且還能夠減輕運維開發的成本,解決通過訊息佇列構建 Kappa 架構過程中遇到的一些痛點?答案是肯定的,後面的篇幅會詳細論述。

3) 痛點總結

4) Flink+Iceberg 構建實時數倉

① 近實時的資料接入

前面介紹了 Iceberg 既支援讀寫分離,又支援併發讀、增量讀、小檔案合併,還可以支援秒級到分鐘級的延遲,基於這些優勢我們嘗試採用 Iceberg 這些功能來構建基於 Flink 的實時全鏈路批流一體化的實時數倉架構。

如下圖所示,Iceberg 每次的 commit 操作,都是對資料的可見性的改變,比如說讓資料從不可見變成可見,在這個過程中,就可以實現近實時的資料記錄。

② 實時數倉 - 資料湖分析系統

此前需要先進行資料接入,比如用 Spark 的離線排程任務去跑一些資料,拉取,抽取最後再寫入到 Hive 表裡面,這個過程的延時比較大。有了 Iceberg 的表結構,可以中間使用 Flink,或者 spark streaming,完成近實時的資料接入。

基於以上功能,我們再來回顧一下前面討論的 Kappa 架構,Kappa 架構的痛點上面已經描述過,Iceberg 既然能夠作為一個優秀的表格式,既支援 Streaming reader,又可以支援 Streaming sink,是否可以考慮將 Kafka 替換成 Iceberg?

Iceberg 底層依賴的儲存是像 HDFS 或 S3 這樣的廉價儲存,而且 Iceberg 是支援 parquet、orc、Avro 這樣的列式儲存。有列式儲存的支援,就可以對 OLAP 分析進行基本的優化,在中間層直接進行計算。例如謂詞下推最基本的 OLAP 優化策略,基於 Iceberg snapshot 的 Streaming reader 功能,可以把離線任務天級別到小時級別的延遲大大的降低,改造成一個近實時的資料湖分析系統。

在中間處理層,可以用 presto 進行一些簡單的查詢,因為 Iceberg 支援 Streaming read,所以在系統的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計算的任務,把中間結果做進一步計算後輸出到下游。

總的來說,Iceberg 替換 Kafka 的優勢主要包括:

優勢:

  • 實現儲存層的流批統一;
  • 中間層支援 OLAP 分析;
  • 完美支援高效回溯;
  • 儲存成本降低。

劣勢:

  • 資料延遲從實時變成近實時;
  • 對接其他資料系統需要額外開發工作。

秒級分析 - 資料湖加速:

由於 Iceberg 本身是將資料檔案全部儲存在 HDFS 上的,HDFS 讀寫這塊對於秒級分析的場景,還是不能夠完全滿足我們的需求,所以接下去我們會在 Iceberg 底層支援 Alluxio 這樣一個快取,藉助於快取的能力可以實現資料湖的加速。這塊的架構也在我們未來的一個規劃和建設中。

作者丨五分鐘學大資料