Flink的狀態程式設計和容錯機制(四)
一、狀態程式設計
Flink 內建的很多運算元,資料來源 source,資料儲存 sink 都是有狀態的,流中的資料都是 buffer records,會儲存一定的元素或者元資料。例如 : ProcessWindowFunction會快取輸入流的資料,ProcessFunction 會儲存設定的定時器資訊等等。
1,運算元狀態(operator state)
運算元狀態的作用範圍限定為運算元任務。這意味著由同一並行任務所處理的所有資料都可以訪問到相同的狀態,狀態對於同一任務而言是共享的。Flink為運算元狀態提供三種基本資料結構:
列表狀態(List state):將狀態表示為一組資料的列表。 聯合列表狀態(Union list state):也將狀態表示為資料的列表。它與常規列表狀態的區別在於,在發生故障時,或者從儲存點(savepoint)啟動應用程式時如何恢復。 廣播狀態(Broadcast state):如果一個運算元有多項任務,而它的每項任務狀態又都相同,那麼這種特殊情況最適合應用廣播狀態。
2,鍵控狀態(keyed state)
鍵控狀態是根據輸入資料流中定義的鍵(key)來維護和訪問的。具有相同 key 的所有資料都會訪問相同的狀態。Flink 的 Keyed State 支援以下資料型別:
ValueState[T]儲存單個的值,值的型別為 T。 get 操作: ValueState.value() set 操作: ValueState.update(value: T) ListState[T]儲存一個列表,列表裡的元素的資料型別為 T。基本操作如下: ListState.add(value: T) ListState.addAll(values: java.util.List[T]) ListState.get()返回 Iterable[T] ListState.update(values: java.util.List[T]) MapState[K, V]儲存 Key-Value 對。 MapState.get(key: K) MapState.put(key: K, value: V) MapState.contains(key: K) MapState.remove(key: K) ReducingState[T] AggregatingState[I, O] State.clear()是清空操作。
案例:判斷兩個相鄰的評分之間差值,如果大於10就輸出當前key對應的這兩次評分。
自定義繼承RichFlatMapFunction
val resultDStream:DataStream[(String,Double,Double)] = stream.keyBy(_.id) .flatMap(new MyKeyedState(10.0)) //keyby之後再進行自定義的聚合
//輸入為id,輸入為(id,lastRate,currentRate) class MyKeyedState(diff:Double) extends RichFlatMapFunction[Item,(String,Double,Double)]{ //記錄上次的評分 var lastRateState:ValueState[Double] = _ override def open(parameters: Configuration): Unit = { //初始化上次的評分 lastRateState = getRuntimeContext.getState[Double](new ValueStateDescriptor[Double]("rate",Types.of[Double])) } override def flatMap(value: Item, out: Collector[(String, Double, Double)]): Unit = { val currentRate = value.rate val lastRate = lastRateState.value() if (lastRate != 0 && (lastRate - currentRate).abs > diff) { //不是第一次進入並且差值大於10 out.collect((value.id,lastRate,currentRate)) } lastRateState.update(currentRate) } }
使用flatMapWithState
val resultDStream:DataStream[(String,Double,Double)] = stream.keyBy(_.id) .flatMapWithState[(String,Double,Double),Double]{ case (item:Item,None)=> //如果state為None表示是第一次,此時給定初始值即可 (List.empty,Some(item.rate)) case (item:Item,last:Some[Double])=> //如果有值的情況下就是判定和輸出 val lastRate = last.getOrElse(0) val currentRate = item.rate if (lastRate != 0 && (lastRate - currentRate).abs > 10.0) { //不是第一次進入並且差值大於10 (List((item.id,lastRate,currentRate)),Some(currentRate)) }else{ (List.empty,Some(currentRate)) } }
二、狀態一致性
1,一致性級別
在流處理中,一致性可以分為3個級別:
at-most-once: 這其實是沒有正確性保障的委婉說法 ——故障發生之後,計數結果可能丟失。 同樣的還有 udp。 at-least-once: 這表示計數結果可能大於正確值,但絕不會小於正確值。 也就是說,計數程式在發生故障後可能多算,但是絕不會少算。 exactly-once: 這指的是系統保證在發生故障後得到的計數結果與正確值一致。
Flink 的一個重大價值在於,它既保證了 exactly-once,也具有低延遲和高吞吐的處理能力。
2,端到端(end-to-end)狀態一致性
端到端的一致性保證,意味著結果的正確性貫穿了整個流處理應用的始終;每一個元件都保證了它自己的一致性,整個端到端的一致性級別取決於所有元件中一致性最弱的元件。具體可以劃分如下:
內部保證 — — 依賴 checkpoint source端 — — 需要外部源可重設資料的讀取位置 sink端 — — 需要保證從故障恢復時,資料不會重複寫入外部系統
而對於sink端,又有兩種具體的實現方式:冪等( Idempotent)寫入和事務性( Transactional)寫入。
冪等:所謂冪等操作,是說一個操作,可以重複執行很多次,但只導致一次結果更改,也就是說,後面再重複執行就不起作用了
事務性:需要構建事務來寫入外部系統,構建的事務對應著checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入sink 系統中。
對於事務性寫入,具體又有兩種實現方式:預寫日誌(WAL)和兩階段提交(2PC)。DataStream API 提供了 GenericWriteAheadSink 模板類和TwoPhaseCommitSinkFunction 介面。
三、檢查點(checkpoint)
1,檢查點演算法(圖解)
簡介:有兩個source:source1和source2,這兩個source均接收到(1,2,3,4,5,6,7,8,9...)等資料
第一步,jobmanager接收到checkpoint編號為2的資料,如下圖:
第二步,source停止接收checkpoint編號為2之後的資料,產生對應的barrier屏障。直到狀態後端(state backends)存入對應的檢查點之後,返回給source任務,待JobManager通知確認檢查點完成。如圖:
第三步,barrier對齊:等待所有source分割槽中標記相同檢查點編號的資料到達處理完成之後再進行當前barrier(例如當前藍色4資料是source1中cpt2之後的資料,故而先不做計算,會存入快取直到當前黃色標記的cpt2之前的資料全部處理完畢)。如圖:
當收到所有輸入分割槽的 barrier 時,任務就將其狀態儲存到狀態後端的檢查點中,然後將 barrier 繼續向下遊轉發。如圖:
第四步:Sink 任務向 JobManager 確認狀態儲存到 checkpoint 完畢,當所有任務都確認已成功將狀態儲存到檢查點時,檢查點就真正完成了。如圖:
Flink 檢查點演算法的正式名稱是非同步分界線快照(asynchronous barrier snapshotting)。該演算法大致基於Chandy-Lamport 分散式快照演算法。
檢查點是 Flink 最有價值的創新之一,因為它使 Flink 可以保證 exactly-once,並且不需要犧牲效能。
2,Flink+Kafka實現exactly-once
端到端的狀態一致性的實現,需要每一個元件都實現,對於 Flink + Kafka 的資料管道系統(Kafka 進、Kafka 出)而言,各元件怎樣保證 exactly-once 語義呢?
內部 — — 利用 checkpoint 機制,把狀態存檔,發生故障的時候可以恢復,保證內部的狀態一致性。 source — — kafka consumer 作為 source,可以將偏移量儲存下來,如果後續任務出現了故障,恢復的時候可以由聯結器重置偏移量,重新消費資料,保證一致性 sink — — kafka producer 作為 sink,採用兩階段提交 sink,需要實現一個 TwoPhaseCommitSinkFunction
執行過程實際上是一個兩段式提交,每個運算元執行完成,會進行“預提交”,直到執行完sink 操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
具體的兩階段提交步驟總結如下:
第一條資料來了之後,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分割槽日誌但標記為未提交,這就是“預提交” jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的運算元將狀態存入狀態後端,並通知 jobmanager sink 聯結器收到 barrier,儲存當前狀態,存入 checkpoint,通知jobmanager,並開啟下一階段的事務,用於提交下個檢查點的資料 jobmanager 收到所有任務的通知,發出確認資訊,表示 checkpoint 完成 sink 任務收到 jobmanager 的確認資訊,正式提交這段時間的資料 外部 kafka 關閉事務,提交的資料可以正常消費了。
3,狀態後端(state backend)
env.setStateBackend(new MemoryStateBackend()) env.setStateBackend(new FsStateBackend("checkpointDataUri")) env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"))
a)MemoryStateBackend
記憶體級的狀態後端,會將鍵控狀態作為記憶體中的物件進行管理,將它們儲存在 TaskManager 的 JVM 堆上;而將checkpoint 儲存在 JobManager 的記憶體中。
b)FsStateBackend
將checkpoint 存到遠端的持久化檔案系統(FileSystem)上。 而對於本地狀態,跟 MemoryStateBackend 一樣,也會存在 TaskManager 的 JVM 堆上。
c)RocksDBStateBackend
將所有狀態序列化後,存入本地的 RocksDB 中儲存。
使用RocksDB需要新增依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.7.2</version> </dependency>
&n