1. 程式人生 > 其它 >SmartNews:基於 Flink 加速 Hive 日表生產的實踐

SmartNews:基於 Flink 加速 Hive 日表生產的實踐

簡介:將 Flink 無縫地整合到以 Airflow 和 Hive 為主的批處理系統的技術挑戰和應對方案。

本文介紹了 SmartNews 利用 Flink 加速 Hive 日表的生產,將 Flink 無縫地整合到以 Airflow 和 Hive 為主的批處理系統的實踐。詳細介紹過程中遇到的技術挑戰和應對方案,以供社群分享。主要內容為:

  1. 專案背景
  2. 問題的定義
  3. 專案的目標
  4. 技術選型
  5. 技術挑戰
  6. 整體方案及挑戰應對
  7. 專案成果和展望
  8. 後記

一、專案背景

SmartNews 是一家機器學習驅動的網際網路公司。自 2012 年於日本東京成立,並在美國和中國設有辦公室。經過 8 年多的發展,SmartNews 已經成長為日本排名第一,美國成長最快的新聞類應用,覆蓋全球超過 150 多個國家市場。據 2019 年初統計,SmartNews 的 iOS 和 Android 版本全球累計下載量已經超過 5000 萬次。

SmartNews 在過去 9 年的時間,基於 Airflow, Hive, EMR 等技術棧構建了大量的資料集。隨著資料量的增長,這些離線表的處理時間在逐漸拉長。另外,隨著業務方迭代節奏的加快,對錶的實時性也提出了更高的要求。因此,SmartNews 內部發起了 Speedy Batch 的專案,以加快現有離線表生產效率。

本次分享便是 Speedy Batch 專案中的一個例子,加速使用者行為 (actions) 表的實踐。

APP 端上報的使用者行為日誌,每日通過 Hive 作業生成日表,這個表是許多其他表的源頭,至關重要。這個作業需要執行 3 個小時,進而拉高了許多下游表的延遲 (Latency),明顯影響資料科學家、產品經理等使用者的使用體驗。因此我們需要對這些作業進行提速,讓各個表能更早可用。

公司業務基本上都在公有云上,伺服器的原始日誌以檔案形式上傳至雲端儲存,按日分割槽;目前的作業用 Airflow 排程到 EMR 上執行,生成 Hive 日表,資料儲存在雲端儲存。

二、問題的定義

1. 輸入

新聞伺服器每隔 30 秒上傳一個原始日誌檔案,檔案上傳至相應日期和小時的雲端儲存目錄。

2. 輸出

原始日誌經過 ETL 處理之後,按日 (dt) 和行為 (action) 兩級分割槽輸出。action 種類約 300 個,不固定,常有增減。

3. 使用者

對這個表的使用是廣泛的,多途徑的。有從 Hive 裡查詢,也有從 Presto,Jupyter 和 Spark 裡查詢,我們甚至不能確定以上就是全部的訪問途徑。

三、專案的目標

  1. 將 actions 表的時延從 3 小時縮短至 30 分鐘;
  2. 對下游使用者保持透明。透明又分兩個方面:

    • 功能方面:使用者無需修改任何程式碼,做到完全無感
    • 效能方面:新專案產生的表,不應該導致下游讀取時的效能下降

四、技術選型

在本專案之前,同事已經對該作業做了多輪次改進,效果不是很顯著。

嘗試過的方案包括增加資源,投入更多的機器,但遇到了雲端儲存的 IOPS 限制:每個 prefix 最多支援 3000 個併發讀寫,這個問題在輸出階段尤為明顯,即多個 reducer 同時向同一個 action 子目錄輸出的時候,容易碰到這個限制。另外還嘗試了按小時預處理,然後到每日凌晨再合併成日表,但合併過程亦耗時較多,整體時延還是在 2.5 小時左右,效果不夠顯著。

鑑於伺服器端的日誌是近實時上傳至雲端儲存,團隊提出了流式處理的思路,摒棄了批作業等待一天、處理 3 小時的模式,而是把計算分散在一整天,進而降低當天結束後的處理用時。團隊對 Flink 有比較好的背景,加上 Flink 近期對 Hive 的改進較多,因此決定採用基於 Flink 的方案。

五、技術挑戰

挑戰是多方面的。

1. 輸出 RC 檔案格式

當前 Hive 表的檔案格式為 RCFile,為了保證對使用者的透明,我們只能在現有的 Hive 表上做 in-place 的 upgrade,也就是我們得重用當前表,那麼 Flink 輸出的檔案格式也得符合 RCFile 格式,因為一張 Hive 表只能有一個格式。

RCFile 屬於 bulk format (相對應的是 row format),在每次 checkpoint 時必須一次性輸出。如果我們選擇 5 分鐘一次 checkpoint,那麼每個 action 每 5 分鐘必須輸出一個檔案,這會大量增加結果檔案數,進而影響下游的讀取效能。特別是對於低頻 action,檔案數會上百倍的增加。我們瞭解了 Flink 的檔案合併功能,但那是在一個 checkpoint 內多個 sink 資料的合併,這並不能解決我們的問題,我們需要的是跨 checkpoint 的檔案合併。

團隊考慮過以 row format (e.g. CSV) 輸出,然後實現自定義的 Hive SerDe,使之相容 RCFile 和 CSV。但很快我們放棄了這個設想,因為那樣的話,需要為每個查詢場景實現這個 Hybrid 的 SerDe,例如需要為 Presto 實現,為 Spark 實現,等等。

  • 一方面我們沒法投入這麼多資源;
  • 另一方面那種方案也是使用者有感的,畢竟使用者還是需要安裝這個自定義的 SerDe。

    我們之前提出了生成一個新格式的表,但也因為對使用者不夠透明而被否決。

2. Partition 的可感知性和完整性

如何讓下游作業能感知到當天這個 partition 已經 ready?actions 表分兩級 partition, dt 和 action。action 屬於 Hive 的 dynamic partition,數量多且不固定。當前 Airflow 下游作業是等待 insert_actions 這個 Hive 任務完成後,再開始執行的。這個沒問題,因為 insert_actions 結束時,所有 action 的 partition 都已經 ready 了。但對於 Flink 作業來說,沒有結束的訊號,它只能往 Hive 裡面提交一個個的 partition,如 dt=2021-05-29/action=refresh。因為 action 數量多,提交 partition 的過程可能持續數分鐘,因此我們也不能讓 Airflow 作業去感知 dt 級別的 partition,那樣很可能在只有部分 action 的情況下觸發下游。

3. 流式讀取雲端儲存檔案

專案的輸入是不斷上傳的雲端儲存檔案,並非來自 MQ (message queue)。Flink 支援 FileStreamingSource,可以流式的讀入檔案,但那是基於定時 list 目錄以發現新的檔案。但這個方案不適合我們的場景,因為我們的目錄太大,雲端儲存 list 操作根本無法完成。

4. Exactly Once 保證

鑑於 actions 表的重要性,使用者無法接受任何的資料丟失或者重複,因此整個方案需要保證恰好一次的處理。

六、整體方案及挑戰應對

1. 輸出 RCFile 並且避免小檔案

我們最終選擇的方案是分兩步走,第一個 Flink 作業以 json (row format) 格式輸出,然後用另外一個 Flink 作業去做 Json 到 RC 格式的轉化。以此解決 Flink 不能愉快的輸出合適大小 RC 檔案的問題。

輸出 json 的中間結果,這樣我們可以通過 Rolling Policy 控制輸出檔案的大小,可以跨多個 checkpoint 攢成足夠大,或者時間足夠長,然後再輸出到雲端儲存。這裡 Flink 其實利用的是雲端儲存的 Multi Part Upload (MPU) 的功能,即每次 checkpoint Flink 也是把當前 checkpoint 攢下來的資料上傳至 雲端儲存,但輸出的不是檔案,而是一個 part。最後當多個 part 達到大小或者時間要求,就可以呼叫雲端儲存的介面將多個 part 合併成一個檔案,這個合併操作在雲端儲存端完成,應用端無需再次讀取這個 part 到本地合併然後再上傳。而 Bulk format 均需要一次性全域性處理,因此無法分段上傳然後合併,必須一次性全部上傳。

當第二個作業感知到一個新的 json 檔案上傳後,載入它,轉化成 RCFile,然後上傳到最終的路徑。這個過程帶來的延遲較小,一個檔案可以控制在 10s 以內,這是可以接受的。

2. 優雅的感知輸入檔案

輸入端,沒有采用 Flink 的 FileStreamingSource,而是採用雲端儲存的 event notification 來感知新檔案的產生,接受到這個通知後再主動去載入檔案。

3. Partition 的可感知性和完整性

輸出端,我們輸出 dt 級別的 success file,來讓下游可靠地感知日表的 ready。我們實現自定義的 StreamingFileWriter,使之輸出 partitionCreated 和 partitionInactive 的訊號,並且通過實現自定義的 PartitionCommitter,來基於上述訊號判斷日表的結束。

其機制如下,每個雲端儲存 writer 開始寫某個 action,會發出一個 partitionCreated 訊號,當它結束時又發出 partitionInactive 訊號。PartitionCommitter 判斷某一天之內是否所有的 partittion 都 inactive 了,如果是,則一天的資料都處理了,輸出 dt 級別的 success file,在 Airflow 通過感知這個檔案來判斷 Flink 是否完成了日表的處理。

4. Exactly Once

雲端儲存的 event notification 提供 At Least once 保證。Flink 作業內對檔案級別進行去重,作業採用 Exactly Once 的 checkpoint 設定,雲端儲存檔案輸出基於 MPU 機制等價於支援 truncate,因此雲端儲存輸出等價於冪等,因此等價於端到端的 Exactly Once。

七、專案成果和展望

專案已經上線,時延維持在 34 分鐘上下,其中包括 15 分鐘的等待遲到檔案。

  • 第一個 Flink 作業需要 8 分鐘左右完成 checkpoint 和輸出,json 轉 rc 作業需要 12 分鐘完成全部處理。我們可以把這個時間繼續壓縮,但是綜合時效性和成本,我們選擇當前的狀態。
  • json 轉 rc 作業耗時比當初的預想的要大,因為上游作業最後一個 checkpoint 輸出太多的檔案,導致整體耗時長,這個可以通過增加作業的併發度線性的下降。
  • 輸出的檔案數比批作業輸出的檔案數有所增加,增加 50% 左右。這是流式處理於批處理的劣勢,流式處理需要在時間到達時就輸出一個檔案,而此時檔案大小未必達到預期。好在這個程度的檔案數增加不明顯影響下游的效能。
  • 做到了下游的完全透明,整個上線前後,沒有收到任何使用者異常反饋。

該專案讓我們在生產環境驗證了利用流式處理框架 Flink 來無縫介入批處理系統,實現使用者無感的區域性改進。將來我們將利用同樣的技術,去加速更多其他的 Hive 表的生產,並且廣泛提供更細粒度 Hive 表示的生產,例如小時級。另一方面,我們將探索利用 data lake 來管理批流一體的資料,實現技術棧的逐步收斂。

八、後記

由於採用完全不同的計算框架,且需要與批處理系統完全保持一致,團隊踩過不少的坑,限於篇幅,無法一一列舉。因此我們挑選幾個有代表的問題留給讀者思考:

  • 為了驗證新作業產出的結果與原來 Hive 產出一致,我們需要對比兩者的輸出。那麼,如何才能高效的比較兩個 Hive 表的一致性呢?特別是每天有百億級資料,每條有數百個欄位,當然也包含複雜型別 (array, map, array等)。
  • 兩個 Flink 作業的 checkpoint 模式都必須是 Exactly Once 嗎?哪個可以不是,哪個必須是?
  • StreamFileWriter 只有在 checkpoint 時才接受到 partitionCreated 和 partitionInactive 訊號,那麼我們可以在它的 snapshotState() 函式裡面輸出給下游 (下游會儲存到 state) 嗎?
  • 最後一問:你們有更好的方案可供我們參考嗎?

原文連結
本文為阿里雲原創內容,未經允許不得轉載。