1. 程式人生 > >Flink在餓了麽的應用與實踐

Flink在餓了麽的應用與實踐

實時計算 zookeepe driver part 以及 幾分鐘 guarantee 每次 exactly

本文作者:易偉平(餓了麽)

整理:姬平(阿裏巴巴實時計算部)

本文將為大家展示餓了麽大數據平臺在實時計算方面所做的工作,以及計算引擎的演變之路,你可以借此了解Storm、Spark、Flink的優缺點。如何選擇一個合適的實時計算引擎?Flink憑借何種優勢成為餓了麽首選?本文將帶你一一解開謎題。

平臺現狀

下面是目前餓了麽平臺現狀架構圖:

技術分享圖片
來源於多個數據源的數據寫到 kafka 裏,計算引擎主要是 Storm , Spark 和 Flink,計算引擎出來的結果數據再落地到各種存儲上。

目前 Storm 任務大概有100多個,Spark任務有50個左右,Flink暫時還比較少。

目前我們集群規模每天數據量有60TB,計算次數有1000000000,節點有400個。這裏要提一下,Spark 和 Flink都是 on yarn 的,其中Flink onyarn主要是用作任務間 jobmanager 隔離, Storm 是 standalone 模式。

應用場景

1.一致性語義

在講述我們應用場景之前,先強調實時計算一個重要概念, 一致性語義:

1) at-most-once:即 fire and forget,我們通常寫一個 java 的應用,不去考慮源頭的 offset 管理,也不去考慮下遊的冪等性的話,就是簡單的at-most-once,數據來了,不管中間狀態怎樣,寫數據的狀態怎樣,也沒有ack機制。

2) at-least-once: 重發機制,重發數據保證每條數據至少處理一次。

3) exactly-once: 使用粗 checkpoint 粒度控制來實現 exactly-once,我們講的 exactly-once 大多數指計算引擎內的 exactly-once,即每一步的 operator 內部的狀態是否可以重放;上一次的 job 如果掛了,能否從上一次的狀態順利恢復,沒有涉及到輸出到 sink 的冪等性概念。

4) at-least-one + idempotent = exactly-one:如果我們能保證說下遊有冪等性的操作,比如基於mysql實現 update on duplicate key;或者你用es, cassandra之類的話,可以通過主鍵key去實現upset的語義, 保證at-least-once的同時,再加上冪等性就是exactly-once。

2. Storm

餓了麽早期都是使用Storm,16年之前還是Storm,17年才開始有Sparkstreaming, Structed-streaming。Storm用的比較早,主要有下面幾個概念:

1) 數據是 tuple-based

2) 毫秒級延遲

3) 主要支持java, 現在利用apache beam也支持python和go。

4) Sql的功能還不完備,我們自己內部封裝了typhon,用戶只需要擴展我們的一些接口,就可以使用很多主要的功能;flux是Storm的一個比較好的工具,只需要寫一個yaml文件,就可以描述一個Storm任務,某種程度上說滿足了一些需求,但還是要求用戶是會寫java的工程師,數據分析師就使用不了。

★ 2.1 總結

1) 易用性:因為使用門檻高,從而限制了它的推廣。

2)StateBackend:更多的需要外部存儲,比如redis之類的kv存儲。

3) 資源分配方面:用worker和slot提前設定的方式,另外由於優化點做的較少,引擎吞吐量相對比較低一點。

3. Sparkstreaming

有一天有個業務方過來提需求說 我們能不能寫個sql,幾分鐘內就可以發布一個實時計算任務。 於是我們開始做Sparkstreaming。它的主要概念如下:

1) Micro-batch:需要提前設定一個窗口,然後在窗口內處理數據。

2) 延遲是秒級級別,比較好的情況是500ms左右。

3) 開發語言是java和scala。

4) Streaming SQL,主要是我們的工作,我們希望提供 Streaming SQL 的平臺。

特點:

1) Spark生態和 SparkSQL: 這是Spark比較好的地方,技術棧是統一的,SQL,圖計算,machine learning的包都是可以互調的。因為它先做的是批處理,和Flink不一樣,所以它天然的實時和離線的api是統一的。

2) Checkpointon hdfs。

3) On Yarn:Spark是屬於 hadoop 生態體系,和 yarn 集成度高。

4) 高吞吐: 因為它是 micro-batch 的方式,吞吐也是比較高的。

下面給大家大致展示一下我們平臺用戶快速發布一個實時任務的操作頁面,它需要哪些步驟。我們這裏不是寫 DDL 和 DML 語句,而是 UI 展示頁面的方式。

技術分享圖片頁面裏面會讓用戶選一些必要的參數, 首先會選哪一個 kafka 集群,每個分區消費多少,反壓也是默認開啟的。消費位置需要讓用戶每次去指定,有可能用戶下一次重寫實時任務的時候,可以根據業務需求去選擇offset消費點。

中間就是讓用戶描述 pipeline。 SQL 就是 kafka 的多個 topic,輸出選擇一個輸出表,SQL 把上面消費的 kafka DStream 註冊成表,然後寫一串 pipeline,最後我們幫用戶封裝了一些對外 sink (剛剛提到的各種存儲都支持,如果存儲能實現 upsert 語義的話,我們都是支持了的)。

★ 3.1 MultiStream-Join

雖然剛剛滿足一般無狀態批次內的計算要求,但就有用戶想說, 我想做流的 join 怎麽辦, 早期的 Spark1.5 可以參考 Spark-streamingsql 這個開源項目把 DStream 註冊為一個表,然後對這個表做 join 的操作,但這只支持1.5之前的版本,Spark2.0 推出 structured streaming 之後項目就廢棄了。我們有一個 tricky 的方式:

技術分享圖片 讓 Sparkstreaming 去消費多個 topic,但是我根據一些條件把消費的 DStream 裏面的每個批次 RDD 轉化為DataFrame,這樣就可以註冊為一張表,根據特定的條件,切分為兩張表,就可以簡單的做個 join,這個 join 的問題完全依賴於本次消費的數據,它們 join 的條件是不可控的,是比較 tricky 的方式。比如說下面這個例子,消費兩個 topic,然後簡單通過 filer 條件,拆成兩個表,然後就可以做個兩張表的 join,但它本質是一個流。

技術分享圖片

★ 3.2 Exactly-once

技術分享圖片

exactly-once 需要特別註意一個點:

我們必須要求數據 sink 到外部存儲後,offset 才能 commit,不管是到 zookeeper,還是 mysql 裏面,你最好保證它在一個 transaction 裏面,而且必須在輸出到外部存儲(這裏最好保證一個 upsert 語義,根據 unique key 來實現upset語義)之後,然後這邊源頭driver再根據存儲的 offeset 去產生 kafka RDD,executor 再根據 kafka 每個分區的 offset 去消費數據。如果滿足這些條件,就可以實現端到端的 exactly-once 這是一個大前提。

★ 3.3 總結

1) Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):我們要實現跨批次帶狀態的計算的話,在1.X版本,我們通過這兩個接口去做,但還是需要把這個狀態存到 hdfs 或者外部去,實現起來比較麻煩一點。

2) Real Multi-Stream Join:沒辦法實現真正的多個流join的語義。

3) End-To-End Exactly-Once Semantics:它的端到端的 exactly-once 語義實現起來比較麻煩,需要sink到外部存儲後還需要手動的在事務裏面提交offset。

4. STRUCTURED STREAMING

我們調研然後並去使用了 Spark2.X 之後帶狀態的增量計算。下面這個圖是官方網站的:

技術分享圖片
所有的流計算都參照了 Google 的 data flow,裏面有個重要的概念:數據的 processing time 和 event time,即數據的處理時間和真正的發生時間有個 gap 。於是流計算領域還有個 watermark,當前進來的事件水位需要watermark 來維持,watermark 可以指定時間 delay 的範圍,在延遲窗口之外的數據是可以丟棄的,在業務上晚到的數據也是沒有意義的。

下面是 structured streaming 的架構圖:

技術分享圖片
這裏面就是把剛才 sparkstreaming 講 exactly-once 的步驟1,2,3都實現了,它本質上還是分批的 batch 方式,offset 自己維護,狀態存儲用的 hdfs,對外的 sink 沒有做類似的冪等操作,也沒有寫完之後再去 commit offset,它只是再保證容錯的同時去實現內部引擎的 exactly-once。

★ 4.1 特點

1) Stateful Processing SQL&DSL:可以滿足帶狀態的流計算

2) Real Multi-Stream Join:可以通過 Spark2.3 實現多個流的 join,多個流的 join 做法和 Flink 類似,你需要先定義兩個流的條件(主要是時間作為一個條件),比如說有兩個topic的流進來,然後你希望通過某一個具體的 schema 中某個字段(通常是 event time)來限定需要 buffer 的數據,這樣可以實現真正意義上的流的 join。

3)比較容易實現端到端的 exactly-once 的語義,只需要擴展sink的接口支持冪等操作是可以實現 exactly-once的。

特別說一下,structured streaming 和原生的 streaming 的 API 有一點區別,它創建表的 Dataframe 的時候,是需要指定表的 schema 的,意味著你需要提前指定 schema。另外它的 watermark 是不支持 SQL 的,於是我們加了一個擴展,實現完全寫 SQL,可以從左邊到右邊的轉換(下圖),我們希望用戶不止是程序員,也希望不會寫程序的數據分析師等同學也能用到。

技術分享圖片

★ 4.2 總結

1) Trigger(Processing Time、 Continuous ):2.3之前主要基於processing Time,每個批次的數據處理完了立馬觸發下一批次的計算。2.3推出了record by record的持續處理的trigger。

2) Continuous Processing (Only Map-Like Operations):目前它只支持map like的操作,同時sql的支持度也有些限制。

3) LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保證需要自己做一些額外的擴展, 我們發現kafka0.11版本提供了事務的功能,是可以從基於這方面考慮從而去實現從source到引擎再到sink,真正意義上的端到端的exactly-once。

4) CEP(Drools):我們發現有業務方需要提供 CEP 這樣復雜事件處理的功能,目前我們的語法無法直接支持,我們讓用戶使用規則引擎 Drools,然後跑在每個 executor 上面,依靠規則引擎功能去實現 CEP。

於是基於以上幾個 Spark Structured Streaming 的特點和缺點,我們考慮使用 Flink 來做這些事情。

5.Flink

技術分享圖片

Flink 目標是對標 Spark,流這塊是領先比較多,它野心也比較大,圖計算,機器學習等它都有,底層也是支持yarn,tez等。對於社區用的比較多的存儲,Flink 社區官方都支持比較好,相對來說。

Flink 的框架圖:

技術分享圖片

Flink中的 JobManager,相當於 Spark 的 Driver 角色,TaskManger 相當於 Executor,裏面的 Task 也有點類似Spark 的那些 Task。 不過 Flink 用的 RPC 是 akka,同時 Flink Core 自定義了內存序列化框架,另外 Task 無需像Spark 每個 Stage 的 Task 必須相互等待而是處理完後即往下遊發送數據。

Flink binary data處理operator:

技術分享圖片

Spark 的序列化用戶一般會使用 kryo 或者 java 默認的序列化,同時也有 Tungsten 項目對 Spark 程序做一 JVM 層面以及代碼生成方面的優化。相對於 Spark,Flink自己實現了基於內存的序列化框架,裏面維護著key和pointer 的概念,它的 key 是連續存儲,在 CPU 層面會做一些優化,cache miss 概率極低。比較和排序的時候不需要比較真正的數據,先通過這個 key 比較,只有當它相等的時候,才會從內存中把這個數據反序列化出來,再去對比具體的數據,這是個不錯的性能優化點。

Flink Task Chain:

技術分享圖片

Task中 operator chain,是比較好的概念。如果上下遊數據分布不需要重新 shuffle 的話,比如圖中 source 是kafka source,後面跟的 map 只是一個簡單的數據 filter,我們把它放在一個線程裏面,就可以減少線程上下文切換的代價。

並行度概念

技術分享圖片

比如說這裏面會有 5 個 Task,就會有幾個並發線程去跑,chain 起來的話放在一個線程去跑就可以提升數據傳輸性能。Spark 是黑盒的,每個 operator 無法設並發度,而 Flink 可以對每個 operator 設並發度,這樣可以更靈活一點,作業運行起來對資源利用率也更高一點。

Spark 一般通過 Spark.default.parallelism 來調整並行度,有 shuffle 操作的話,並行度一般是通Spark.sql.shuffle.partitions 參數來調整,實時計算的話其實應該調小一點,比如我們生產中和 kafka 的 partition 數調的差不多,batch 在生產上會調得大一點,我們設為1000,左邊的圖我們設並發度為2,最大是10,這樣首先分2個並發去跑,另外根據 key 做一個分組的概念,最大分為10組,就可以做到把數據盡量的打散。

State & Checkpoint

因為 Flink 的數據是一條條過來處理,所以 Flink 中的每條數據處理完了立馬發給下遊,而不像 spark,需要等該operator 所在的 stage 所有的 task 都完成了再往下發。

Flink 有粗粒度的 checkpoint 機制,以非常小的代價為每個元素賦予一個 snapshot 概念,只有當屬於本次snapshot 的所有數據都進來後才會觸發計算,計算完後,才把 buffer 數據往下發,目前 Flink sql 沒有提供控制buffer timeout 的接口,即我的數據要buffer多久才往下發。可以在構建 Flink context 時,指定 buffer timeout為 0,處理完的數據才會立馬發下去,不需要等達到一定閾值後再往下發。

Backend 默認是維護在 jobmanager 內存,我們更多使用的的是寫到 hdfs 上,每個 operator 的狀態寫到 rocksdb 上,然後異步周期增量同步到外部存儲。

容錯

技術分享圖片

圖中左半部分的紅色節點發生了 failover,如果是 at-least-once,則其最上遊把數據重發一次就好;但如果是exactly-once,則需要每個計算節點從上一次失敗的時機重放。

Exactly Once Two-Phase Commit

技術分享圖片

Flink1.4 之後有兩階段提交來支持 exactly-once 。它的概念是從上遊 kafka 消費數據後,每一步都會發起一次投票,來記錄狀態,通過checkpoint的屏障來處理標記,只有最後再寫到kafka(0.11之後的版本),只有最後完成之後,才會把每一步的狀態讓 jobmanager 中的 cordinator 去通知可以固化下來,這樣實現 exactly-once。

Savepoints

技術分享圖片

還有一點 Flink 比較好的就是,基於它的 checkpoint 來實現 savepoint 功能。業務方需要每個應用恢復節點不一樣,希望恢復到的版本也是可以指定的,這是比較好的。這個 savepoint 不只是數據的恢復,也有計算狀態的恢復。

特點:

1) Trigger (Processing Time、 Event Time、IngestionTime):對比下,Flink支持的流式語義更豐富,不僅支持 Processing Time, 也支持 Event timeIngestion Time

2)Continuous Processing & Window:支持純意義上的持續處理,record by record的,window 也比 Spark處理的好。

3) Low End-To-End Latency With Exactly-Once Guarantees:因為有兩階段提交,用戶是可以選擇在犧牲一定吞吐量的情況下,根據業務需求情況來調整來保證端到端的exactly-once。

4) CEP:支持得好。

5) Savepoints:可以根據業務的需求做一些版本控制。

也有做的還不好的:

1)SQL (Syntax Function、Parallelism):SQL功能還不是很完備,大部分用戶是從hive遷移過來,Spark支持hive覆蓋率達到99%以上。 SQL函數不支持,目前還無法對單個operator做並行度的設置。

2) ML、Graph等:機器學習,圖計算等其他領域比Spark要弱一點,但社區也在著力持續改進這個問題。

後續規劃

因為現在餓了麽已經屬於阿裏的一員,後續會更多地使用 Flink,也期待用到 Blink。

Flink在餓了麽的應用與實踐