流式計算系列 Flink的Status是怎麼實現的
實際問題
在流計算場景中,資料會源源不斷的流入Apache Flink系統,每條資料進入Apache Flink系統都會觸發計算。如果我們想進行一個Count聚合計算,那麼每次觸發計算是將歷史上所有流入的資料重新新計算一次,還是每次計算都是在上一次計算結果之上進行增量計算呢?答案是肯定的,Apache Flink是基於上一次的計算結果進行增量計算的。那麼問題來了: "上一次的計算結果儲存在哪裡,儲存在記憶體可以嗎?",答案是否定的,如果儲存在記憶體,在由於網路,硬體等原因造成某個計算節點失敗的情況下,上一次計算結果會丟失,在節點恢復的時候,就需要將歷史上所有資料(可能十幾天,上百天的資料)重新計算一次,所以為了避免這種災難性的問題發生,Apache Flink 會利用State儲存計算結果。本篇將會為大家介紹Apache Flink State的相關內容。
什麼是State
這個問題似乎有些"弱智"?不管問題的答案是否顯而易見,但我還是想簡單說一下在Apache Flink裡面什麼是State?State是指流計算過程中計算節點的中間計算結果或元資料屬性,比如 在aggregation過程中要在state中記錄中間聚合結果,比如Apache Kafka 作為資料來源時候,我們也要記錄已經讀取記錄的offset,這些State資料在計算過程中會進行持久化(插入或更新)。所以Apache Flink中的State就是與時間相關的,Apache Flink任務的內部資料(計算資料和元資料屬性)的快照。
為什麼需要State
與批計算相比,State是流計算特有的,批計算沒有failover機制,要麼成功,要麼重新計算。流計算在 大多數場景 下是增量計算,資料逐條處理(大多數場景),每次計算是在上一次計算結果之上進行處理的,這樣的機制勢必要將上一次的計算結果進行儲存(生產模式要持久化),另外由於 機器,網路,髒資料等原因導致的程式錯誤,在重啟job時候需要從成功的檢查點(checkpoint,後面篇章會專門介紹)進行state的恢復。增量計算,Failover這些機制都需要state的支撐。
State 實現
Apache Flink內部有四種state的儲存實現,具體如下:
- 基於記憶體的HeapStateBackend - 在debug模式使用,不 建議在生產模式下應用;
- 基於HDFS的FsStateBackend - 分散式檔案持久化,每次讀寫都產生網路IO,整體效能不佳;
- 基於RocksDB的RocksDBStateBackend - 本地檔案+非同步HDFS持久化;
- 還有一個是基於Niagara(Alibaba內部實現)NiagaraStateBackend - 分散式持久化- 在Alibaba生產環境應用;
State 持久化邏輯
Apache Flink版本選擇用RocksDB+HDFS的方式進行State的儲存,State儲存分兩個階段,首先本地儲存到RocksDB,然後非同步的同步到遠端的HDFS。 這樣而設計既消除了HeapStateBackend的侷限(記憶體大小,機器壞掉丟失等),也減少了純分散式儲存的網路IO開銷。
State 分類
Apache Flink 內部按照運算元和資料分組角度將State劃分為如下兩類:
- KeyedState - 這裡面的key是我們在SQL語句中對應的GroupBy/PartitioneBy裡面的欄位,key的值就是groupby/PartitionBy欄位組成的Row的位元組陣列,每一個key都有一個屬於自己的State,key與key之間的State是不可見的;
- OperatorState - Apache Flink內部的Source Connector的實現中就會用OperatorState來記錄source資料讀取的offset。
State 擴容重新分配
Apache Flink是一個大規模並行分散式系統,允許大規模的有狀態流處理。 為了可伸縮性,Apache Flink作業在邏輯上被分解成operator graph,並且每個operator的執行被物理地分解成多個並行運算子例項。 從概念上講,Apache Flink中的每個並行運算子例項都是一個獨立的任務,可以在自己的機器上排程到網路連線的其他機器執行。
Apache Flink的DAG圖中只有邊相連的節點