1. 程式人生 > >Flink在美團的實踐與應用

Flink在美團的實踐與應用

開發十年,就只剩下這套架構體系了! >>>   

美團實時計算平臺現狀和背景

實時平臺架構

上圖呈現的是當前美團實時計算平臺的簡要架構。最底層是資料快取層,可以看到美團測的所有日誌類的資料,都是通過統一的日誌收集系統收集到Kafka。Kafka作為最大的資料中轉層,支撐了美團線上的大量業務,包括離線拉取,以及部分實時處理業務等。在資料快取層之上,是一個引擎層,這一層的左側是我們目前提供的實時計算引擎,包括Storm和Flink。Storm在此之前是 standalone 模式的部署方式,Flink由於其現在執行的環境,美團選擇的是On YARN模式,除了計算引擎之外,我們還提供一些實時儲存功能,用於儲存計算的中間狀態、計算的結果、以及維度資料等,目前這一類儲存包含Hbase、Redis以及ES。在計算引擎之上,是趨於五花八門的一層,這一層主要面向資料開發的同學。實時資料開發面臨諸多問題,例如在程式的除錯調優方面就要比普通的程式開發困難很多。在資料平臺這一層,美團面向使用者提供的實時計算平臺,不僅可以託管作業,還可以實現調優診斷以及監控報警,此外還有實時資料的檢索以及許可權管理等功能。除了提供面向資料開發同學的實時計算平臺,美團現在正在做的事情還包括構建元資料中心。這也是未來我們想做SQL的一個前提,元資料中心是承載實時流系統的一個重要環節,我們可以把它理解為實時系統中的大腦,它可以儲存資料的Schema,Meta。架構的最頂層就是我們現在實時計算平臺支撐的業務,不僅包含線上業務日誌的實時查詢和檢索,還涵蓋當下十分熱門的實時機器學習。機器學習經常會涉及到搜尋和推薦場景,這兩個場景最顯著特點:一、會產生海量實時資料;二、流量的QPS相當高。此時就需要實時計算平臺承載部分實時特徵的提取工作,實現應用的搜尋推薦服務。還有一類是比較常見的場景,包括實時的特徵聚合,斑馬Watcher(可以認為是一個監控類的服務),實時數倉等。

以上就是美團目前實時計算平臺的簡要架構。

實時平臺現狀

美團實時計算平臺的現狀是作業量現在已經達到了近萬,叢集的節點的規模是千級別的,天級訊息量已經達到了萬億級,高峰期的訊息量能夠達到千萬條每秒。

痛點和問題

美團在調研使用Flink之前遇到了一些痛點和問題:

  • 實時計算精確性問題:在調研使用Flink之前美團很大規模的作業是基於Storm去開發的,Storm主要的計算語義是At-Least-Once,這種語義在保證正確性上實際上是有一些問題的,在Trident之前Storm是無狀態的處理。雖然Storm
    Trident提供了一個維護狀態的精確的開發,但是它是基於序列的Batch提交的,那麼遇到問題在處理效能上可能會有一點瓶頸。並且Trident是基於微批的處理,在延遲上沒有達到比較高的要求,所以不能滿足一些對延遲比較高需求的業務。

  • 流處理中的狀態管理問題:基於之前的流處理過程中狀態管理的問題是非常大的一類問題。狀態管理除了會影響到比如說計算狀態的一致性,還會影響到實時計算處理的效能以及故障恢復時候的能力。而Flink最突出的一個優勢就是狀態管理。

  • 實時計算表義能力的侷限性:在實時計算之前很多公司大部分的資料開發還是面向離線的場景,近幾年實時的場景也慢慢火熱起來了。那與離線的處理不同的是,實時的場景下,資料處理的表意能力可能有一定的限制,比如說他要進行精確計算以及時間視窗都是需要在此之上去開發很多功能性的東西。

  • 開發除錯成本高:近千結點的叢集上已經跑了近萬的作業,分散式的處理的引擎,手工寫程式碼的方式,給資料開發的同學也帶來了很高開發和除錯的成本,再去維護的時候,運維成本也比較高。

Flink探索關注點

在上面這些痛點和問題的背景下,美團從去年開始進行Flink的探索,關注點主要有以下4個方面:

  • ExactlyOnce計算能力

  • 狀態管理能力

  • 視窗/Join/時間處理等等

  • SQL/TableAPI

Flink在美團的實踐

下面帶大家來看一下,美團從去年投入生產過程中都遇到了哪些問題,以及一些解決方案,分為下面三個部分:

穩定性實踐

穩定性實踐-資源隔離

  • 資源隔離的考慮:分場景、按業務

    1. 高峰期不同,運維時間不同;
    2. 可靠性、延遲需求不同;
    3. 應用場景,重要性不同
  • 資源隔離的策略:

    1. YARN打標籤,節點物理隔離;
    2. 離線DataNode與實時計算節點的隔離

穩定性實踐-智慧排程

智慧排程目的也是為了解決資源不均的問題,現在普通的排程策略就是基於CPU,基於記憶體去排程的。除此之外,在生產過程中也發現了一些其他的問題,比如說Flink是會依賴本地磁碟,進行依賴本地磁碟做本地的狀態的儲存,所以磁碟IO,還有磁碟的容量,也是一類考慮的問題點,除此之外還包括網絡卡流量,因為每個業務的流量的狀態是不一樣的,分配進來會導致流量的高峰,把某一個網絡卡打滿,從而影響其他業務,所以期望的話是說做一些智慧排程化的事情。目前暫時能做到的是從cpu和記憶體兩方面,未來會從其他方面做一些更優的排程策略。

穩定性實踐-故障容錯

  • 節點/網路故障

    • JobManagerHA

    • 自動拉起

與Storm不同的是,知道Storm在遇到異常的時候是非常簡單粗暴的,比如說有發生了異常,可能使用者沒有在程式碼中進行比較規範的異常處理,但是沒有關係,因為worker會重啟作業還會繼續執行,並且他保證的是At-Least-Once這樣的語義,比如說一個網路超時的異常對他而言影響可能並沒有那麼大,但是Flink不同的是他對異常的容忍度是非常的苛刻的,那時候就考慮的是比如說會發生節點或者是網路的故障,那JobManager單點問題可能就是一個瓶頸,JobManager那個如果掛掉的話,那麼可能對整個作業的影響就是不可回覆的,所以考慮了做HA,另外一個就是會去考慮一些由於運維的因素而導致的那作業,還有除此之外,可能有一些使用者作業是沒有開啟CheckPoint,但如果是因為節點或者是網路故障導致掛掉,希望會在平臺內層做一些自動拉起的策略,去保證作業執行的穩定性。

  • 上下游容錯

    • FlinkKafka08異常重試

我們的資料來源主要是Kafka,讀寫Kafka是一類非常常見的實時流處理避不開的一個內容,而Kafka本身的叢集規模是非常比較大的,因此節點的故障出現是一個常態問題,在此基礎上我們對節點故障進行了一些容錯,比如說節點掛掉或者是資料均衡的時候,Leader會切換,那本身Flink的讀寫對Leader的切換容忍度沒有那麼高,在此基礎上我們對一些特定場景的,以及一些特有的異常做的一些優化,進行了一些重試。

  • 容災

    • 多機房

    • 流熱備

容災可能大家對考慮的並不多,比如說有沒有可能一個機房的所有的節點都掛掉了,或者是無法訪問了,雖然它是一個小概率的事件,但它也是會發生的。所以現在也會考慮做多機房的一些部署,包括還有Kafka的一些熱備。

Flink平臺化

Flink平臺化-作業管理

在實踐過程中,為了解決作業管理的一些問題,減少使用者開發的一些成本,我們做了一些平臺化的工作,下圖是一個作業提交的介面展示,包括作業的配置,作業生命週期的管理,報警的一些配置,延遲的展示,都是整合在實時計算平臺的。

Flink平臺化-監控報警

在監控上我們也做了一些事情,對於實時作業來講,對監控的要求會更高,比如說在作業延遲的時候對業務的影響也比較大,所以做了一些延遲的報警,包括作業狀態的報警,比如說作業存活的狀態,以及作業執行的狀態,還有未來會做一些自定義Metrics的報警。自定義Metrics是未來會考慮基於作業處理本身的內容性,做一些可配置化的一些報警。

Flink平臺化-調優診斷

  • 實時計算引擎提供統一日誌和Metrics方案

  • 為業務提供按條件過濾的日誌檢索

  • 為業務提供自定義時間跨度的指標查詢

  • 基於日誌和指標,為業務提供可配置的報警

另外就是剛剛提到說在開發實時作業的時候,調優和診斷是一個比較難的痛點,就是使用者不是很難去檢視分散式的日誌,所以也提供了一套統一的解決方案。這套解決方案主要是針對日誌和Metrics,會在針對引擎那一層做一些日誌和Metrics的上報,那麼它會通過統一的日誌收集系統,將這些原始的日誌,還有Metrics彙集到Kafka那一層。今後Kafka這一層大家可以發現它有兩個下游,一方面是做日誌到ES的資料同步,目的的話是說能夠進入日誌中心去做一些日誌的檢索,另外一方面是通過一些聚合處理流轉到寫入到OpenTSDB把資料做依賴,這份聚合後的資料會做一些查詢,一方面是Metrics的查詢展示,另外一方面就是包括實做的一些相關的報警。

下圖是當前某一個作業的一個可支援跨天維度的Metrics的一個查詢的頁面。可以看到說如果是能夠通過縱向的對比,可以發現除了作業在某一個時間點是因為什麼情況導致的?比如說延遲啊這樣容易幫使用者判斷一些他的做作業的一些問題。除了作業的執行狀態之外,也會先就是採集一些節點的基本資訊作為橫向的對比

下圖是當前的日誌的一些查詢,它記錄了,因為作業在掛掉之後,每一個ApplicationID可能會變化,那麼基於作業唯一的唯一的主鍵作業名去搜集了所有的作業,從建立之初到當前執行的日誌,那麼可以允許使用者的跨Application的日誌查詢。

生態建設

為了適配這兩類MQ做了不同的事情,對於線上的MQ,期望去做一次同步多次消費,目的是避免對線上的業務造成影響,對於的生產類的Kafka就是線下的Kafka,做了一些地址的地址的遮蔽,還有基礎基礎的一些配置,包括一些許可權的管理,還有指標的採集。

Flink在美團的應用

下面會給大家講兩個Flink在美團的真實使用的案例。第一個是Petra,Petra其實是一個實時指標的一個聚合的系統,它其實是面向公司的一個統一化的解決方案。它主要面向的業務場景就是基於業務的時間去統計,還有計算一些實時的指標,要求的話是低時延,他還有一個就是說,因為它是面向的是通用的業務,由於業務可能是各自會有各自不同的維度,每一個業務可能包含了包括應用通道機房,還有其他的各自應用各個業務特有的一些維度,而且這些維度可能涉及到比較多,另外一個就是說它可能是就是業務需要去做一些複合的指標的計算,比如說最常見的交易成功率,他可能需要去計算支付的成功數,還有和下單數的比例。另外一個就是說統一化的指標聚合可能面向的還是一個系統,比如說是一些B端或者是R段的一些監控類的系統,那麼系統對於指標系統的訴求,就是說我希望指標聚合能夠最真最實時最精確的能夠產生一些結果,資料保證說它的下游系統能夠真實的監控到當前的資訊。右邊圖是我當一個Metrics展示的一個事例。可以看到其他其實跟剛剛講也是比較類似的,就是說包含了業務的不同維度的一些指標匯聚的結果。

Petra實時指標聚合

  • 業務場景:

    • 基於業務時間(事件時間)

    • 多業務維度:如應用、通道、機房等

    • 複合指標計算:如交易成功率=支付成功數/下單數

    • 低延遲:秒級結果輸出

  • Exactlyonce的精確性保障

    • Flinkcheckpoint機制
  • 維度計算中資料傾斜

    • 熱點key雜湊
  • 對晚到資料的容忍能力

    • 視窗的設定與資源的權衡

在用Flink去做實時指標複核的系統的時候,著重從這幾方面去考慮了。第一個方面是說精確的計算,包括使用了FLink和CheckPoint的機制去保證說我能做到不丟不重的計算,第一個首先是由統一化的Metrics流入到一個預聚合的模組,預聚合的模組主要去做一些初始化的一些聚合,其中的為什麼會分預聚合和全量聚合主要的解決一類問題,包括就剛剛那位同學問的一個問題,就是資料傾斜的問題,比如說在熱點K發生的時候,當前的解決方案也是通過預聚合的方式去做一些緩衝,讓儘量把K去打散,再聚合全量聚合模組去做匯聚。那其實也是隻能解決一部分問題,所以後面也考慮說在效能的優化上包括去探索狀態儲存的效能。下面的話還是包含晚到資料的容忍能力,因為指標匯聚可能剛剛也提到說要包含一些複合的指標,那麼符合的指標所依賴的資料可能來自於不同的流,即便來自於同一個流,可能每一個數據上報的時候,可能也會有晚到的情況發生,那時候需要去對資料關聯做晚到的容忍,容忍的一方面是說可以設定晚到的Lateness的延遲,另一方面是可以設定視窗的長度,但是其實在現實的應用場景上,其實還有一方面考慮就是說除了去儘量的去拉長時間,還要考慮真正的計算成本,所以在這方面也做了一些權衡,那麼指標基本就是經過全量聚合之後,聚合結果會回寫Kafka,經過資料同步的模組寫到OpenTSDB去做,最後去grafana那做指標的展示,另一方面可能去應用到通過Facebook包同步的模組去同步到報警的系統裡面去做一些指標,基於指標的報警。

下圖是現在提供的產品化的Petra的一個展示的機示意圖,可以看到目前的話就是定義了某一些常用的運算元,以及維度的配置,允許使用者進行配置話的處理,直接去能夠獲取到他期望要的指標的一個展示和匯聚的結果。目前還在探索說為Petra基於Sql做一些事情,因為很多使用者也比較就是在就是習慣上也可以傾向於說我要去寫Sql去完成這樣的統計,所以也會基於此說依賴Flink的本身的對SQl還有TableAPI的支援,也會在Sql的場景上進行一些探索。

MLX機器學習平臺


第二類應用就是機器學習的一個場景,機器學習的場景可能會依賴離線的特徵資料以及實時的特徵資料。一個是基於現有的離線場景下的特徵提取,經過了批處理,流轉到了離線的叢集。另外一個就是近線模式,近線模式出的資料就是現有的從日誌收集系統流轉過來的統一的日誌,經過Flink的處理,就是包括流的關聯以及特徵的提取,再做模型的訓練,流轉到最終的訓練的叢集,訓練的叢集會產出P的特徵,還有都是Delta的特徵,最終將這些特徵影響到線上的線上的特徵的一個訓練的一個服務上。這是一個比較常見的,比如說比較就是通用的也是比較通用的一個場景,目前的話主要應用的方可能包含了搜尋還有推薦,以及一些其他的業務。

未來展望

未來的話可能也是通過也是期望在這三方面進行做一些更多的事情,剛剛也提到了包括狀態的管理,第一個是狀態的統一的,比如說Sql化的統一的管理,希望有統一的配置,幫使用者去選擇一些期望的回滾點。另外一個就是大狀態的效能優化,因為比如說像做一些流量資料的雙流的關聯的時候,現在也遇到了一些效能瓶頸的問題,對於說啊基於記憶體型的狀態,基於記憶體型的資料的處理,以及基於RocksDB的狀態的處理,做過效能的比較,發現其實效能的差異還是有一些大的,所以希望說在基於RocksDBBackend的上面能夠去儘量去更多的做一些優化,從而提升作業處理的效能。第二方面就是Sql,Sql的話應該是每一個位就是當前可能各個公司都在做的一個方向,因為之前也有對Sql做一些探索,包括提供了基於Storm的一些Sql的表示,但是可能對於之前的話對於與語義的表達可能會有一些欠缺,所以希望說在基於Flink可去解決這些方面的事情,以及包括Sql的併發度的一些配置的優化,包括Sql的查詢的一些優化,都希望說在Flink未來能夠去優化更多的東西,去真正能使Sql應用到生產的環境。

另外一方面的話就是會進行新的場景的也在做新的場景的一些探索,期望是比如說包括剛剛也提到說除了流式的處理,也期望說把離線的場景下的資料進行一些合併,通過統一的Sql的API去提供給業務做更多的服務,包括流處理,還有