1. 程式人生 > >Flink架構及其工作原理

Flink架構及其工作原理

目錄

System Architecture

分散式系統需要解決:分配和管理在叢集的計算資源、處理配合、持久和可訪問的資料儲存、失敗恢復。Fink專注分散式流處理。

Components of a Flink Setup

  • JobManager :接受application,包含StreamGraph(DAG)、JobGraph(logical dataflow graph,已經進過優化,如task chain)和JAR,將JobGraph轉化為ExecutionGraph(physical dataflow graph,並行化),包含可以併發執行的tasks。其他工作類似Spark driver,如向RM申請資源、schedule tasks、儲存作業的元資料,如checkpoints
  • ResourceManager:一般是Yarn,當TM有空閒的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閒的TM。
  • TaskManager類似Spark的executor
  • Dispatcher提供REST介面來接收client的application提交,它負責啟動JM和提交application,同時執行Web UI。

下面以Flink on Yarn為例,首先在Flin的配置中設定好 YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH。接著啟動YARN session,此時會檢查資源是否足夠,足夠才把包含Flink及其配置的jar提交到HDFS。然後client向YARN RM申請資源啟動Application Master在叢集中保持一個Flink master/ YARN Session,包含RM和Dispatcher,當提交作業時才啟動JM並向Flink的RM申請資源,不夠的話,Flink的RM向YARN的RM申請資源。在啟動App Master時,NodeManager會從HDFS中下載之前上傳的所需檔案。當App Master啟動後,他會產生新的Flink配置檔案(給TaskManager用),並上傳到HDFS。然後給TM分配container,後者從HDFS下載所需檔案。至此,Flink就可以接收job了。


下面是簡單例子,詳細看官網

# 啟動yarn-session,4個TM,每個有4GB堆記憶體,4個slot
cd flink-1.7.0/
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m -s 4
# 啟動作業
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

細節取決於具體環境,如不同的RM

Application Deployment

Framework模式:Flink作業為JAR,並被提交到Dispatcher or JM or YARN。

Library模式:Flink作業為application-specific container image,如Docker image,適合微服務。

Task Execution

任務可以是相同operator (data parallelism),不同 operator (task parallelism),甚至不同application (job parallelism)。TM提供一定數量的slots來控制並行的任務數。

上圖A和C是source function,E是sink function,小數字表示並行度。

一個TM是一個JVM程序,它通過多執行緒完成任務。執行緒的隔離不太好,一個執行緒失敗有可能導致整個TM失敗。

Highly-Available Setup

從失敗中恢復需要重啟失敗程序、作業和恢復它的state。

當一個TM掛掉而RM又無法找到空閒的資源時,就只能暫時降低並行度,直到有空閒的資源重啟TM。

當JM掛掉就靠ZK來重新選舉,和找到JM儲存到遠端storage的元資料、JobGraph。重啟JM並從最後一個完成的checkpoint開始。

JM在執行期間會得到每個task checkpoints的state儲存路徑(task將state寫到遠端storage)並寫到遠端storage,同時在ZK的儲存路徑留下pointer指明到哪裡找上面的儲存路徑。

背壓

資料湧入的速度大於處理速度。在source operator中,可通過Kafka解決。在任務間的operator有如下機制應對:

Local exchange:task1和2在同一個工作節點,那麼buffer pool可以直接交給下一個任務,但下一個任務task2消費buffer pool中的資訊速度減慢時,當前任務task1填充buffer pool的速度也會減慢。

Remote exchange:TM保證每個task至少有一個incoming和一個outgoing緩衝區。當下遊receiver的處理速度低於上有的sender的傳送速度,receiver的incoming緩衝區就會開始積累資料,當擠滿後就不再接收資料。上游sender就開始在outgoing緩衝區積累資料。

TM負責資料在tasks間的轉移,轉移之前會儲存到buffer(這又變回micro-batches)。每個TM有32KB的網路buffer用於接收和傳送資料。如果sender和receiver在不同程序,那麼會通過作業系統的網路棧來通訊。每對TM保持permanent TCP連線來交換資料。每個sender任務能夠給所有receiving任務傳送資料,反之,所有receiver任務能夠接收所有sender任務的資料。TM保證每個任務都至少有一個incoming和outgoing的buffer,並增加額外的緩衝區分配約束來避免死鎖。

如果sender和receiver任務在同一個TM程序,sender會序列化結果資料到buffer,如果滿了就放到佇列。receiver任務通過佇列得到資料並進行反序列化。這樣的好處是解耦任務並允許在任務中使用可變物件,從而減少了物件例項化和垃圾收集。一旦資料被序列化,就能安全地修改。而缺點是計算消耗大,在一些條件下能夠把task穿起來,避免序列化。(C10)

Flow Control with Back Pressure

receiver放到緩衝區的資料變為佇列,sender將要傳送的資料變為佇列,最後sender減慢傳送速度。

Event Time Processing

event time處理的資料必須有時間戳(Long unix timestamp)並定義了watermarks。watermark是一種特殊的records holding a timestamp long value。它必須是遞增的(防止倒退),有一個timestamp t(下圖的5),暗示所有接下來的資料都會大於這個值。後來的,小於這個值,就被視為遲來資料,Flink有其他機制處理。

Watermarks and Event Time

WM在Flink是一種特殊的record,它會被operator tasks接收和釋放。

tasks有時間服務來維持timers(timers註冊到時間服務上),在time-window task中,timers分別記錄了各個window的結束時間。當任務獲得一個watermark時,task會根據這個watermark的timestamp更新內部的event-time clock。任務內部的時間服務確定所有timers時間是否小於watermark的timestamp,如果大於則觸發call-back運算元來釋放記錄並返回結果。最後task還會將更新的event-time clock的WM進行廣播。(結合下圖理解)

只有ProcessFunction可以讀取和修改timestamp或者watermark(The ProcessFunction can read the timestamp of a currently processed record, request the current event-time of the operator, and register timers)。下面是PF的行為。

當收到WM大於所有目前擁有的WM,就會把event-time clock更新為所有WM中最小的那個,並廣播這個最小的WM。即便是多個streams輸入,機制也一樣,只是增加Paritition WM數量。這種機制要求獲得的WM必須是累加的,而且task必須有新的WM接收,否則clock就不會更新,task的timers就不會被觸發。另外,當多個streams輸入時,timers會被WM比較離散的stream主導,從而使更密集的stream的state不斷積累。

Timestamp Assignment and Watermark Generation

當streaming application消化流時產生。Flink有三種方式產生:

  • SourceFunction:產生的record帶有timestamp,一些特殊時點產生WM。如果SF暫時不再發送WM,則會被認為是idle。Flink會從接下來的watermark operators中排除由這個SF生產的分割槽(上圖有4個分割槽),從而解決timer不觸發的問題。
  • AssignerWithPeriodicWatermarks 提取每條記錄的timestamp,並週期性的查詢當前WM,即上圖的Partition WM。
  • AssignerWithPunctuatedWatermarks 可以從每條資料提取WM。

上面兩個User-defined timestamp assignment functions通常用在source operator附近,因為stream一經處理就很難把握record的時間順序了。所以UDF可以修改timestamp和WM,但在資料處理時使用不是一個好主意。

State Management

由任務維護並用於計算函式結果的所有資料都屬於任務的state。其實state可以理解為task業務邏輯的本地或例項變數。

在Flink,state總是和特定的operator關聯。operator需要註冊它的state,而state有兩種型別:

  • Operator State:由同一並行任務處理的所有記錄都可以訪問相同的state,而其他的task或operator不能訪問,即一個task專屬一個state。這種state有三種primitives
    • List State represents state as a list of entries.
    • Union List State同上,但在任務失敗和作業從savepoint重啟的行為不一樣
    • Broadcast State(v1.5) 同樣一個task專屬一個state,但state都是一樣的(需要自己注意保持一致,對state更新時,實際上只對當前task的state進行更新。只有所有task的更新一樣時,即輸入資料一樣(一開始廣播所以一樣,但資料的順序可能不一樣),對資料的處理一樣,才能保證state一樣)。這種state只能儲存在記憶體,所以沒有RockDB backend。
  • Keyed State:相同key的record共享一個state。
    • Value State:每個key一個值,這個值可以是複雜的資料結構.
    • List State:每個key一個list
    • Map State:每個key一個map

上面兩種state的存在方式有兩種:raw和managed,一般都是用後者,也推薦用後者(更好的記憶體管理、不需造輪子)。

State Backends

state backend決定了state如何被儲存、訪問和維持。它的主要職責是本地state管理和checkpoint state到遠端。在管理方面,可選擇將state儲存到記憶體還是磁碟。checkpoint方面在C8詳細介紹。

MemoryStateBackend, FsStateBackend, RocksDBStateBackend適合越來越大的state。都支援非同步checkpoint,其中RocksDB還支援incremental的checkpoint。

  • 注意:As RocksDB’s JNI bridge API is based on byte[], the maximum supported size per key and per value is 2^31 bytes each. IMPORTANT: states that use merge operations in RocksDB (e.g. ListState) can silently accumulate value sizes > 2^31 bytes and will then fail on their next retrieval. This is currently a limitation of RocksDB JNI.

Scaling Stateful Operators

Flink會根據input rate調整併發度。對於stateful operators有以下4種方式:

  • keyed state:根據key group來調整,即分為同一組的key-value會被分到相同的task

  • list state:所有list entries會被收集並重新均勻分佈,當增加併發度時,要新建list

  • union list state:增加併發時,廣播整個list,所以rescaling後,所有task都有所有的list state。

  • broadcast state

Checkpoints, Savepoints, and State Recovery

Flink’s Lightweight Checkpointing Algorithm

在分散式開照演算法Chandy-Lamport的基礎上實現。有一種特殊的record叫checkpoint barrier(由JM產生),它帶有checkpoint ID來把流進行劃分。在CB前面的records會被包含到checkpoint,之後的會被包含在之後的checkpoint。

當source task收到這種資訊,就會停止傳送recordes,觸發state backend對本地state的checkpoint,並廣播checkpoint ID到所有下游task。當checkpoint完成時,state backend喚醒source task,後者向JM確定相應的checkpoint ID已經完成任務。

當下遊獲得其中一個CB時,就會暫停處理這個CB對應的source的資料(完成checkpoint後傳送的資料),並將這些資料存到緩衝區,直到其他相同ID的CB都到齊,就會把state(下圖的12、8)進行checkpoint,並廣播CB到下游。直到所有CB被廣播到下游,才開始處理排隊在緩衝區的資料。當然,其他沒有傳送CB的source的資料會繼續處理。

最後,當所有sink會向JM傳送BC確定checkpoint已完成。

這種機制還有兩個優化:

  • 當operator的state很大時,複製整個state併發送到遠端storage會很費時。而RocksDB state backend支援asynchronous and incremental的checkpoints。當觸發checkpoint時,backend會快照所有本地state的修改(直至上一次checkpoint),然後馬上讓task繼續執行。後臺執行緒非同步傳送快照到遠端storage。
  • 在等待其餘CB時,已經完成checkpoint的source資料需要排隊。但如果使用at-least-once就不需要等了。但當所有CB到齊再checkpoint,儲存的state就已經包含了下一次checkpoint才記錄的資料。(如果是取最值這種state就無所謂)

Recovery from Consistent Checkpoints

上圖佇列中的7和6之所以能恢復,取決於資料來源是否resettable,如Kafka,不會因為傳送資訊就把資訊刪除。這才能實現處理過程的exactly-once state consistency(嚴格來講,資料還是被重複處理,但是在讀檔後重復的)。但是下游系統有可能接收到多個結果。這方面,Flink提供sink運算元實現output的exactly-once,例如給checkpoint提交records釋放記錄。另一個方法是idempotent updates,詳細看C7。

Savepoints

checkpoints加上一些額外的元資料,功能也是在checkpoint的基礎上豐富。不同於checkpoints,savepoint不會被Flink自動創造(由使用者或者外部scheduler觸發創造)和銷燬。savepoint可以重啟不同但相容的作業,從而:

  • 修復bugs進而修復錯誤的結果,也可用於A/B test或者what-if場景。
  • 調整併發度
  • 遷移作業到其他叢集、新版Flink

也可以用於暫停作業,通過savepoint檢視作業情況。

參考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske