《Stream processing with Apache Flink》 讀書筆記
《Stream processing with Apache Flink》 讀書筆記
第二章 流處理基礎
資料分發策略
- 轉發 一對一轉發,降低網路IO
- 廣播 一對多廣播
- 按key 按Key值(可能是hash)分發, 說明可以自定義實現range型分發
- 隨機
- 普通Shuffle
流式處理
流資料定義
- 無限的事件序列
延遲&吞吐
- 無限的事件沒有總處理完成時間和輸入資料量大小,要看處理延遲的大小和實時吞吐量衡量程式的處理效能
延遲
- 延遲指的是處理一個時間所需的時長 即從接收事件到成功輸出處理結果的時長
吞吐
- 系統在單位時間內可以處理的事件個數 吞吐量受限於接收速率
- 當達系統到吞吐極限時,一味的提高吞吐量,可能導致延遲增加,接收資料緩衝區耗盡,繼而資料丟失等(背壓)
資料流操作
- 無狀態計算:處理事件時無需依賴已處理過的事件,不儲存已處理過的事件。
- 有狀態計算:處理事件時可能需要維護之前接受的事件資訊,有狀態運算元的狀態會根據輸入的事件不斷更新。
資料接入和輸出
- 資料接入是指從外部資料來源獲取原始資料並將其轉換成適合後續處理的格式的過程,實現接入操作邏輯的運算元稱之為資料來源
轉換
- 分別處理每個事件,逐個讀取事件,並對其應用某些轉換操作輸出一條新的輸出流。
滾動聚合
- 會根據每個到來的事件持續更新結果
- 聚合操作都是有狀態的
- 滿足結合律和交換律(即不是視窗函式)
視窗操作
-
轉換和聚合每次處理一個事件來產生輸出並更新結果(可能不更新),有些操作必須收集並緩衝記錄才能計算結果(如計算中位數),因為可維持的資料記錄有限,因而必須對需要維持的資料量加以限制。
-
視窗即為一個計算邊界。
-
視窗操作會持續的建立一些稱為“桶”的有限事件集合,並允許基於“桶”進行計算。
-
視窗操作決定了什麼時候建立桶,事件如何分配到桶中,以及桶內的事件什麼時候參與計算。
視窗定義
-
滾動視窗 事件分配到長度固定且互不重疊的桶中,在視窗邊界通過後所有事件會發送給計算函式處理。
- 長度:建立新桶時每個桶的大小(時長,事件個數等);
-
滑動視窗 將事件分配到大小固定且允許相互重疊的桶中,事件可能同時屬於多個桶。
-
間隔:每達到一定條件( 時長,事件個數等)就建立一個新的桶;
-
長度:建立新桶時每個桶的大小(時長,事件個數等);
-
-
會話視窗 按照Session的斷開劃分桶。 根據Session間隔將事件劃分到不同的桶中,長度不固定,劃分時機也不固定。
時間語義
任何裝置都可能因為某段時間的網路等問題導致資料接收時間不連續。
例如地鐵中的手遊,會在網路環境恢復的時候再將資料傳送到伺服器,這裡應該使用事件時間而非接收時間進行流計算。
流計算中有兩個不同的事件概念
- 處理時間
- 事件時間
處理時間
處理時間是當前流處理運算元所在機器上的本地時鐘時間,即和資料傳輸的速度有很大關係。
事件時間
事件時間是資料流中事件實際發生的時間,它以附加在資料流中的資料時間戳為準,在傳輸資料之前就已經生成,不再受到傳輸管道的影響。
水位線
-
在按照事件時間進行視窗計算的時候,如何決定視窗計算的觸發時機?何時才能認為已經接收了某時間點前全部的事件?
-
Flink的水位線是一個全域性進度指標,它也是一個事件記錄,運算元收到該事件後即可以觸發視窗計算或對接收資料進行排序了,這勢必會造成更高的延遲,因為計算要等到接收到水位線的時候才會觸發。
-
水位線是結果的準確性與延遲之間的Trad-off,實踐中系統無法獲取足夠多的資訊來完美地確定水位線,因此它也不是一個完美的解決方案。
在處理時間和事件時間的選擇上,處理時間擁有更低的延遲,事件時間擁有更高的準確性,也是一個需要權衡的點。
狀態和一致性模型
狀態
-
在任何一個稍微複雜的資料處理中都需要狀態,為了生成結果,函式會在一段時間或基於一定個數的事件來累積狀態,有狀態運算元同時使用傳入的事件和內部狀態來計算輸出。
-
由於流式運算元處理的是無窮盡的資料,所以必須避免內部狀態無限增長,為了限制狀態大小,運算元通常都會只保留到目前為止所見事件的摘要或概覽,這可能是一個數值,累計值,一個對迄今為止全部事件的抽樣一個緩衝等資料結構。
- 狀態管理 系統需要高效地管理狀態並保證它們不受併發更新的影響
- 狀態劃分 將狀態按照Key劃分,並獨立管理每一部分即可做到狀態並行化(分區劃分)。
- 狀態恢復 即使出現故障也要確保結果正確,如何從狀態中恢復運算元。
任務故障
流計算中通過重新處理所有輸入來重建故障期間丟失的運算元狀態在時間、計算資源上代價都比較高。
任何任務都需要執行以下步驟:
- 接收事件,存在本地緩衝區
- 選擇性的更新狀態
- 產生輸出記錄
這三步都可能發生故障。
結果保障
分為兩種保障,狀態結果一致性保障和輸出的一致性保障,這裡的保障是指輸出的一致性。
- 至多一次(At most once)
最簡單的保障即為不做任何事情,既不重放,也不恢復丟失的狀態。 - 至少一次(At latest once)
資料的計算可能會重複,但絕對不會丟失,即不管丟失的狀態,直接重放資料。 - 精確一次(Exactly once)
最嚴格的一致性保障,每個事件對於內部狀態的更新都僅有一次,恢復每個運算元的狀態,不需要重複消費資料。 - 端到端精確一次(End to end exactly once)
Source-Execute-Sink,整個資料處理管道上結果都是正確的,通過冪等輸出可實現精確一次的語義。
第三章 Apache Flink 架構
Flink 角色
- JM程序(JobManager),每個應用都由一個不同的JM(StandAlone則是執行緒,Yarn則是程序)掌控,其會包含一個JobGraph即邏輯DataFlow圖,以及一個包含了全部資原始檔的Jar。JM將JobGraph轉化成ExecutionGraph的物理DataFlow圖,該圖包含可以並行執行的任務。JM會從ResourceManager申請執行任務必須的TaskSlot,成功後將 ExecutionGraph中的任務分發給 TaskManager執行。執行過程中JobManager還會負責從中協調,建立Checkpoint、SavePoint和狀態恢復等。 (個人認為StandAlone下的JM類似於Spark Master,Yarn模式下則類似於Spark Driver,起到資源申請、回收、協調,任務生成、分配、監控的作用;那麼任務的協調者在哪,應該是JM的一個執行緒)。
- 對於不同的資源環境(Yarn、Standalone、Kubernetes等),Flink有不同的ResourceManager來適配,RS負責申請、建立、回收Flink的處理資源(TaskSlot),當JM申請TaskSlot時,RM會指定一個空閒的TaskManager將其TaskSlot提供給該JM,除了StandAlone模式外TMSlot不足時則會向resource provider申請Container啟動新的TM。
- TM(TaskManager),Flink的苦工程序,每個TM提供一定數量的執行緒Slot,StandAlone模式啟動時會建立多個TM,每個TM有一定數量的執行緒Slot,啟動後向RM註冊Slot。
- Dispatcher會跨Job執行,提供REST介面用於提交Flink應用,一旦提交一個應用,Dispathcer便會啟動一個JM並將應用轉交給它。
應用部署
框架模式
在該模式下,Flink應用需要打包成一個Jar檔案,通過客戶端提交到Dispatcher,JobManager或是Yarn-ResourceManager。
- JM:立即執行;
- DisPatcher、Yarn-ResourceManager:啟動一個JM,將應用轉交給它執行。
庫模式
Flink應用會繫結到一個特定應用的容器映象(如Docker映象),映象中包含著JM、Flink-RM程式碼,映象啟動後自動載入JM&RM,並將繫結的Job提交執行,另一個和作業無關的映象負責部署並執行TM。
任務執行
TaskManager執行多個任務|運算元同時執行,TM通過提供固定的Slot控制並行任務數,每個TMSlot執行任務的一部分(運算元的一個並行任務)。
TaskManager會在同一個JVM中以執行緒的形式執行任務。
故障恢復
TaskManager故障
TaskManager掛了導致Slot數量不足,JM會向RM申請Slot,如果沒有足夠的Slot則會等待。
JobManager故障
JobManager掛掉將會導致流處理無法繼續執行(這在StandAlone中會導致Flink的單點故障,之後提供了HA機制)。
HA-JM會將JAR包寫出到遠端儲存中,並通過ZK記錄目錄。
故障時JM下的所有任務都會被取消,新JM會:
- 向ZK請求目錄,獲取JobGraph、Jar、已經應用最新的檢查點在遠端儲存的狀態控制代碼;
- 重新向RM申請Slot;
- 重啟應用,並使用最近一次檢查點重置任務狀態。
WaterMark
WM在Flink是一種特殊的record,它會被operator tasks接收和釋放。
WaterMark transfer
當收到WM大於所有分割槽目前擁有的WM,就會把event-time clock更新為所有WM中最小的那個,並廣播這個最小的WM。即便是多個streams輸入,機制也一樣,只是增加Paritition WM數量。這種機制要求獲得的WM必須是累加的,而且task必須有新的WM接收,否則clock就不會更新,task的timers就不會被觸發。另外,當多個streams輸入時,timers會被WM比較離散的stream主導,從而使更密集的stream的state不斷積累。
WaterMark init
- SourceFunction,實現資料來源函式,生成WM
- periodic assigner,自定義週期分配函式,從每行record中解析時間,週期性響應獲取當前水位線的查詢請求
- punctuated assigner,自定義定點分配函式,從某一行record中解析時間,用於根據特殊記錄生成水位的情況。
WM的生成要緊靠資料來源運算元,因為經過其他運算元處理後原有的順序可能亂序且無法分辨事件時間戳。
State
- 運算元狀態 operator state
- 鍵值狀態 keyed state
Operator State
運算元task對於自身的狀態,不能訪問其他任務的state
- 列表狀態 list state
- 聯合列表 union list state
- 廣播狀態 broadcast state
保證運算元每個task狀態都相同
Keyed Sate
相同key的record共享一個state,不能訪問其他key的state
- value state
每個key一個值,這個值可以是複雜的資料結構 - list state
每個key一個list - map state
每個key一個map
State Backends
為保證快速訪問狀態,每個並行任務都會把狀態維護在本地,狀態的管理則由state backend負責,它主要負責本地狀態管理和將狀態以checkpoint的形式寫出到remote storage。
state backend決定了state如何被儲存、訪問和維持。它的主要職責是本地state管理和checkpoint state到遠端。在管理方面,可選擇將state儲存到記憶體還是磁碟。
Scaling Stateful Operators
Flink會根據input rate調整併發度。對於stateful operators有以下4種擴縮容方式:
- keyed state:根據key group來調整,即分為同一組的key-value會被分到相同的task
- list state:所有list entries會被收集並重新均勻分佈,當增加併發度時,可能某些task沒有state,要新建list
- union list state:增加併發時,廣播所有list entries,rescaling後,task自行決定保留哪些entries
- broadcast state:廣播狀態到新任務上,直接停掉多出的任務。
Checkpoints ,State Recovery and Savepoints
通過將運算元狀態重置到沒計算那些資料的時刻實現精確一次的狀態一致性;
只有輸入流是可重放的應用才能支援精確一次的狀態一致性。
檢查點&恢復機制僅能重置流應用內部的狀態,根據採用的sink,在恢復過程中可能會有結果記錄重複傳送多次到下游系統。
Checkpoint
每個operator有自己的checkpoint
- 9:1+3+5
- 6:2+4
State Recovery
重放資料並計算
- 12:2+4+6
- 9:1+3+5
Checkpointing Algorithm
Flink checkpointing在分散式開照演算法Chandy-Lamport的基礎上實現。有一種特殊的record叫checkpoint barrier(由JM產生),它帶有checkpoint ID來把流進行劃分。在CB前面的records會被包含到checkpoint,之後的會被包含在之後的checkpoint。
Streaming Example
- Sink1:2= 0+2
- Sink2:5= 1+(1+3)
JM init checkpoint
checkpoint由JobManager通過向每個資料來源任務傳送帶有新checkpoint ID的訊息來啟動。
Source Checkpoint
- source收到JM新checkpoint ID訊息,停止傳送recordes
- 觸發state backend對本地state的checkpoint,廣播帶checkpoint ID的CB到下游task
- 本地state的checkpoint完成後,state backend喚醒source
- source向JM確定相應的checkpoint 已經完成,之後source繼續傳送records
source通過將CB注入其輸出,確定了checkpoint的位置。
Execution Checkpoint
CB對齊
CB對齊後state checkpoint
恢復處理資料
當下遊獲得一個帶有新checkpoint ID的CB時:
- 暫停處理並快取這個CB對應的source的資料,其他沒有傳送CB的source的資料會繼續處理。
- 等待老ID的CB都到齊(老checkpoint ID對應資料都收集全了)
- 對state對老checkpoint ID對應結果進行checkpoint
- 廣播帶老checkpoint ID的CB到下游
- 直到所有老CB被廣播到下游,才開始處理排隊在緩衝區的資料
- 開始處理快取中這個CB對應的source的資料,處理完快取後開始處理流資料
Sink Checkpoint
最後,當所有sink收齊全部CB會向JM傳送CB,確定checkpoint已完成,JM會將此checkpoint標記完成。
Performace Implications of Checkpointing
task state存入checkpoint時會阻塞,當state很大時,複製整個state併發送到遠端storage會很費時。
Flink 設計中state backend負責checkpoint,因此如何實現具體的copy完全取決於state backend實現,比如File state backend和RocksDB state backend支援非同步checkpoint,觸發checkpoint時,backend會快照所有本地state的修改(直至上一次checkpoint),然後馬上讓task繼續執行,後臺執行緒非同步傳送快照到遠端storage,完成後就會通知task。
-
RocksDB state backend支援asynchronous and incremental的checkpoints。增量checkpoint可以有效降低資料量
-
在等待其餘CB時,已經完成checkpoint的source資料需要排隊。但如果不使用exactly-once使用at-least-once則不需要等待。但當所有老CB到齊,進行checkpoint時,state中會包含新checkpoint的資料。
SavePoint
checkpoints加上一些額外的元資料,功能也是在checkpoint的基礎上豐富。不同於checkpoints,savepoint不會被Flink自動建立(由使用者或者外部scheduler觸發創造)和銷燬。savepoint可以重啟不同但相容的作業,從而:
- 修復bugs進而修復錯誤的結果,也可用於A/B test或者what-if場景。
- 調整併發度
- 遷移作業到其他叢集、新版Flink
也可以用於暫停作業,通過savepoint檢視作業情況,甚至有使用者通過該功能不斷將應用例項遷移到成本最低的資料中心。