大資料計算引擎之Flink Flink狀態管理和容錯
這裡將介紹Flink對有狀態計算的支援,其中包括狀態計算和無狀態計算的區別,以及在Flink中支援的不同狀態型別,分別有 Keyed State 和 Operator State 。另外針對狀態資料的持久化,以及整個 Flink 任務的資料一致性保證,Flink 提供了 Checkpoint 機制處理和持久化狀態結果資料,隨後對狀態資料 Flink 提供了不同的狀態管理器來管理狀態資料,例如: MemoryStateBackend 等。
有狀態計算
在Flink架構體系中,有狀態計算可以說是Flink非常重要的特徵之一。有狀態計算是指在程式計算過程中,在Flink程式內部,儲存計算產生的中間結果,並提供給Functions 或 孫子計算結果使用。如圖所示:
狀態資料可以維繫在本地儲存中,這裡的儲存可以是 Flink 的堆記憶體或者堆外記憶體,也可以藉助第三方的儲存介質,例如:Flink中已經實現的RocksDB,當然使用者也可以自己實現相應的快取系統去儲存狀態資訊,以完成更加複雜的計算邏輯。和狀態計算不同的是,無狀態計算不會儲存計算過程中產生的結果,也不會將結果用於下一步計算過程中,程式只會在當前的計算流程中實行計算,計算完成就輸出結果,然後下一條資料接入,然後處理。
無狀態計算實現的複雜度相對較低,實現起來比較容易,但是無法完成提到的比較複雜的業務場景,例如:
- [ ] 使用者想實現CEP(複雜事件處理),獲取符合某一特定時間規則的事件,狀態計算就可以將接入的事件進行儲存,然後等待符合規則的事件觸發;
- [ ] 使用者想要按照 minutes / hour / day 等進行聚合計算,求取當前最大值、均值等聚合指標,這就需要利用狀態來維護當前計算過程中產生的結果,例如事件的總數、總和以及最大,最小值等;
- [ ] 使用者想在 Srteam 上實現機器學習的模型訓練,狀態計算可以幫助使用者維護當前版本模型使用的引數;
- [ ] 使用者想使用歷史的資料進行計算,狀態計算可以幫助使用者對資料進行快取,使使用者可以直接從狀態中獲取相應的歷史資料。
Flink 狀態及應用
狀態型別
在 Flink 中根據資料集是否根據 Key 進行分割槽,將狀態分為 Keyed State 和 Operator State(Non-Keyed State) 兩種型別。
Keyed State
表示和key相關的一種state ,只能用於 KeyedStream 型別資料集對應的Functions和Operators之上。Keyed State 是 Operator State 的特例,區別在於 Keyed State 事先按照 key 對資料集進行了分割槽,每個 Key State 僅對應一個 Operator 和 Key 的組合。 Keyed State 可以通過 Key Group 進行管理,主要用於當運算元並行度發生變化時,自動重新分佈 Keyed State 資料。
Operator State
與 Keyed State 不同的是,Operator State 只和並行的運算元例項繫結,和資料元素中的 Key 無關,每個運算元例項中持有所有資料元素中的一部分狀態資料。 Operator State 支援當運算元例項並行度發生變化時自動重新分配狀態資料。
同時在Flink中 Keyed State 和 Operator State 均具有兩種形式,其中一種為託管狀態(Managered State)形式,由Flink Runtime 中控制和管理狀態資料,並將狀態資料轉換稱為記憶體Hash tables 或 Recks DB 的物件儲存,然後將這些狀態資料通過內部介面持久化到 Checkpoints 中,任務異常時可以通過這些狀態資料恢復任務。另外一種是原生狀態(Row State)形式,由運算元自己管理資料結構,當觸發 Checkpoints 過程中,Flink並不知道狀態資料內部的資料結構,只是將資料轉換成 bytes 資料儲存在 Checkpoints 中,當從 Checkpoints 恢復任務時,運算元自己在反序列化出狀態的資料結構。
Notes: Flink中推薦使用者使用 Managered State 管理狀態資料,主要原因是:Manager State 能夠更好的支援狀態資料的重平衡以及更加完善的記憶體管理。
Managered Keyed State
Flink 有以下Managered Keyed State 型別可以使用,每種狀態都有相應的的使用場景,使用者可以根據實際需求選擇使用。
- [ ]
ValueState[T]
: 與 Key 對應單個值的狀態,例如統計 user_id 對應的交易次數,每次使用者交易都會在 count 狀態值上進行更新。 ValueState 對應的更新方法是update(T)
, 取值是T value()
; - [ ]
ListState[T]
: 與 Key 對應元素列表的狀態,狀態中存放元素的 List 列表。例如定義 ListValue儲存使用者經常訪問的 IP 地址。在 ListState 中新增元素使用add(T) , addAll(List[T])
兩個方法。獲取元素使用Iterable<T> get()
方法,更新元素使用update(List[T])
方法; - [ ]
ReducingState[T]
: 定義與 Key 相關的資料元素單個聚合值的狀態,使用者儲存經過指定 ReduceFunction 計算之後的指標,因此,ReduceState 需要指定ReduceFunction 完成狀態資料的聚合。ReducingState 新增元素使用add(T)
方法,獲取元素使用T get()
; - [ ]
AggregeateState[IN,OUT]
: 定義 與key相關的資料元素單個聚合值的狀態,用於維護資料經過指定 AggregateFunction 計算之後的指標。和ReducingState相比,AggregeateState 的輸入輸出型別不一定相同,但ReducingState 輸入/出 型別必須保持一致。和ListState相似,AggregatingState 需要指定AggregateFunction完成狀態資料的聚合操作。AggregatringState新增元素使用add(IN)
方法, 獲取元素使用OUT get()
方法; - [ ]
MapState<UK, UV>
:這會保留一個對映列表。您可以將鍵值對放入狀態並檢索Iterable所有當前儲存的對映。使用put(UK, UV)
或 新增對映putAll(Map[UK,UV])
(Map<UK, UV>)。可以使用來檢索與使用者鍵關聯的值get(UK)
。對於對映,鍵和值可迭代檢視可以使用被檢索entries()
,keys()
並values()
分別。
Stateful Function定義
示例:
在RichFlatMapFunction 中定義 ValueState,已完成最小值的獲取:
inputStream.keyBy(_._1).flatMap(
// (String,Long,Int) 輸入型別
// (String,Long,Long) 輸出型別
new RichFlatMapFunction[(Int,Long) , (Int,Long,Long)] {
private var leastValueState:ValueState[Long] = _
// 定義狀態名稱
private var leastValueStateDesc:ValueStateDescriptor[Long] = _
override def open(parameters: Configuration): Unit = {
// 指定狀態型別
leastValueStateDesc = new ValueStateDescriptor[Long]("leastValueState" , classOf[Long])
// 通過 getRuntimeContext.getState 拿到狀態
leastValueState = getRuntimeContext.getState(leastValueStateDesc)
}
override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
// 通過 value 拿到最小值
val leastValue: Long = leastValueState.value()
// 如果前一個指標大於最小值,則直接輸出資料元素和最小值
if ( leastValue != 0L && value._2 > leastValue){
out.collect((value._1 , value._2 , leastValue))
}else{
// 如果當前指標小於最小值,則更新狀態中的最小值
leastValueState.update(value._2)
// 將當前資料中的指標作為最小值輸出
out.collect(value._1 , value._2 , value._2)
}
}
}).print()
State生命週期
對於任何型別 Keyed State 都可以設定狀態生命週期(TTL),以確保能夠在規定時間內即時清理狀態資料。狀態生命週期功能可通過 StateTtlConfig 配置然後將 StateTtlConfig 配置傳入StateDescriptor 中的 enableTimeToLive 方法中即可。Keyed State 配置例項如下所示:
val config: StateTtlConfig = StateTtlConfig
// 指定TTL時長為 5s
.newBuilder(Time.seconds(5))
// 指定TTL 重新整理只對建立和寫入操作有效
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 指定狀態可見性不返回過期資料
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
leastValueStateDesc.enableTimeToLive(config)
在StateTtlConfig中除了通過 newBuilder() 方法中設定過期時間的引數是必須的之外,其他的引數都是可選的或使用預設值。其中 setUpdateType方法中傳入的型別有兩種:
- StateTtlConfig.UpdateType.onCreateAndWrite 僅在建立和寫入時更新 TTL ;
- StateTtlConfig.UpdateType.OnReadAndWriter 僅在讀與寫操作都更新 TTL ;
需要注意的是,過期的狀態資料根據UpdateType引數進行配置,只有被寫入或者讀取的是時間才會更新TTL,也就是說如果某個狀態指標一直不被使用活著更新,則永遠不會觸發對該狀態資料的清理操作,這種情況可能會導致系統中的狀態資料越來越大。
另外,可以通過 setStateVisibility 方法設定狀態的可見性,根據過期資料是否被清理來確定是否返回狀態資料:
- StateTtlConfig.StateVisibility.NeverReturnExpired: 狀態資料過期就不會返回(預設)
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp: 狀態資料即使過期但沒有被清理依然返回
Scala DataStream API中使用狀態
直接上程式碼片段:
inputStream.keyBy(_._1)
// 指定輸入引數型別和狀態引數型別
.mapWithState((in:(Int,Long) , count : Option[Int]) =>
// 判斷count 型別是否非空
count match {
// 輸出 key , count 並在原來 count 資料上累加
case Some(c) => ((in._1 , c) , Some(c + in._2))
// 如果狀態為空,則將指標填入
case None => ((in._1 , 0) , Some(in._2))
}
)
Manager Operator State
Operator State 是一種 non-keyed-state ,與並行的操作運算元例項相關聯,例如在 Kafka Connector 中,每個 Kafka 消費端運算元例項都對應到 Kafka 的一個分割槽中,維護Topic分割槽和 Offsets 偏移量作為運算元的 Operator State. 在Flink中可以實現 CheckpointedFunction
或者 ListCheckpoint<T extends Serializable>
兩個介面來定義操作 Managered Operator State 的函式。
通過 CheckpointedFunction 介面操作Operator State
CheckpointedFunction 介面定義如圖:
@PublicEvolving
@SuppressWarnings("deprecation")
public interface CheckpointedFunction {
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
在每個獨立的運算元中,Managered Operator State 都是以 List 形式儲存的,運算元和運算元之間的狀態資料相互獨立,List儲存比較適合於狀態資料的重新分佈,Flink目前支援Manager Operator State 兩種重要分佈策略,分別是 Event-split Redistribution 和 Union Redistribution。
- [ ] Event-split Redistribution: 每個運算元例項中含有部分元素的List列表,整個狀態資料是所有List列表,整個狀態資料是所有List列表的合集。當觸發 restore/redistribution 動作時,通過將狀態資料平均分配成與運算元並行度相同數量的List列表,每個 task 例項中有一個 List,其可以為空或者含有多個元素。
- [ ] Union Redistribution: 每個運算元例項中含有所有狀態元素的List 列表,當觸發 restore/redistribution 動作時,每個運算元可以獲取到完整的狀態元素列表。
/**
* @title CheckpointCount
* @description 實現 CheckpointFunction 介面利用Operator State 統計輸入到運算元的資料量
* @author Mr.Sun
* @version v.1.0
* @date 2019/12/24 9:16
*/
class CheckpointCount(val numElements: Int) extends FlatMapFunction[(Int, Long), (Int, Long, Long)] with CheckpointedFunction {
// 定義運算元例項本地變數,儲存Operator資料數量
private var operatorCount: Long = _
// 定義 keyedState ,儲存和 key 相關的狀態值
private var keyedState: ValueState[Long] = _
// 定義 operatorState , 儲存運算元的狀態值
private var operatorState: ListState[Long] = _
override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = {
val keyedCount: Long = keyedState.value()
// 更新 keyedState 數量
keyedState.update(keyedCount)
// 更新本地的運算元 operatorCount
operatorCount = operatorCount + 1
// 輸出結果,包括 id , id 對應的的數量統計 keyedCount ,運算元輸入資料的數量統計 operatorCount
out.collect(value._1, keyedCount, operatorCount)
}
// 當發生了 snapshotState , 將 operatorCount 新增到 operatorState 中
override def snapshotState(context: FunctionSnapshotContext): Unit = {
operatorState.clear()
operatorState.add(operatorCount)
}
// 初始化狀態資料
override def initializeState(context: FunctionInitializationContext): Unit = {
// 定義並獲取 keyedState
keyedState = context.getKeyedStateStore.getState(new ValueStateDescriptor[Long]("keye-state", classOf[Long]))
// 定義並獲取 operatorState
operatorState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long]("operator-state", classOf[Long]))
// 定義在 Restored 過程中, 從 operatorState 中恢復資料的邏輯
if (context.isRestored){
val value: util.Iterator[Long] = operatorState.get().iterator()
while (value.hasNext){
operatorCount += value.next()
}
}
}
}
通過 ListCheckpointed介面定義 Operator State
/**
* @title NumberRecordsCount
* @description 實現 ListCheckpoint介面利用Operator State 統計運算元輸入資料數量
* @author Mr.Sun
* @version v.1.0
* @date 2019/12/24 10:14
*/
class NumberRecordsCount extends FlatMapFunction[(String, Long), (String, Long)] with ListCheckpointed[Long] {
// 定義運算元中接入的 numberRecords 數量
private var numberRecords: Long = 0L
override def flatMap(value: (String, Long), out: Collector[(String, Long)]): Unit = {
// 介入一條計算規則進行統計,並輸出
numberRecords += 1
out.collect(value._1, numberRecords)
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
Collections.singletonList(numberRecords)
}
override def restoreState(state: util.List[Long]): Unit = {
numberRecords = 0L
for (count <- state) {
// 從恢復狀態中 恢復 numberRecords
numberRecords += count
}
}
}
Checkpoints 和 Savepoints
Checkpoints檢查機制
Flink 中基於非同步輕量級的分散式快照技術提供了 Checkpoints 容錯機制,分散式快找可以將同一時間點 Task / Operator 的狀態資料全域性統一快照處理,包括前面提到的Keyed State 和 Operator State . Flink 會在輸入的資料集上間隔性的生成checkpoint barrier ,通過柵欄(barrier)將間隔時間段內的資料劃分到相應的checkpoint 中,當應用出現異常時,Operator 就能夠從上一次快照中恢復所有運算元之前的狀態,從而保證資料的一致性。
舉個栗子:在 KafkaConsumer 運算元維護 Offset 狀態,當系統出現問題無法從 Kafka 中消費資料時,可以將 Offset 記錄在狀態中,當系統出現問題,無法從Kafka消費資料時,可以將 Offset 記錄在狀態中,當任務重新恢復時就能夠指定偏移量消費資料。
Checkpoint 過程中狀態資料一般會被儲存在一個可配置的環境中,通常在 JobManager節點或者HDFS上。
Checkpoint 開啟和時間間隔指定
開啟檢查點並且指定檢查點時間間隔為 1000ms ,根據實際情況自行選擇,如果狀態比較大,則建議適當增加該值;
environment.enableCheckpointing(1000)
exactly-ance 和 at-least-once 語義
可以選擇 exactly-once 語義保證整個應用內 端到端 的資料一致性,這種情況比較適合資料要求高,不允許出現數據丟失或重複,與此同時,Flink 的效能也相對較弱,而 at-least-once 語義更適合於時延和吞吐要求非常高但對資料一致性要求不高的場景。
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
Checkpoint 超時時間
超時時間制定了每次Checkpoint 執行過程中的上限時間範圍,一旦 Checkpoint 執行時間超過該閾值,Flink 將會中斷Checkpoint 過程,並按照超時處理。該指標可以通過 setCheckpointTimeout 方法設定,預設 10 分鐘
environment.getCheckpointConfig.setCheckpointTimeout(60000)
檢查點之間最小時間間隔
該引數主要目的是設定兩個Checkpoint 之間最小時間間隔,防止出現例如狀態資料過大導致Checkpoint 執行時間過長,導致 Checkpoint 積壓過多,最終Flink 應用密集地觸發 Checkpoint 操作,會佔用大量計算資源而影響到整個應用的效能
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
最大並行度執行檢查點數量
通過 setMaxCurrentCheckpoint()方法設定能夠最大同時執行的 Checkpoint 數量。在預設情況下只有一個檢查點可以執行,根據使用者指定的數量可以同時觸發多個Checkpoint,進而提升Checkpoint整體的效率.
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
外部檢查點
設定週期性的外部檢查點,然後將狀態資料持久化到外部系統中,使用這種方式不會在任務正常停止的過程中清理檢查點資料,而是會一直保持在外部系統介質中,另外也可以通過從外部檢查點中對任務進行恢復.
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
failOnCheckpointingErrors
ailOnCheckpointingErrors 引數決定了當Checkpoint執行過程中如果出現失敗或者錯誤時,任務是否同時被關閉,預設值為True
environment.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// 上述的方式已經被棄用了,使用下面的方式
val number: Int = environment.getCheckpointConfig.getTolerableCheckpointFailureNumber
environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(number)
Savepoints 機制
Savepoints 是檢查點的一種特殊實現,底層實現其實也是使用Checkpoints的機制。Savepoints是使用者以手工命令的方式觸發Checkpoint,並將結果持久化到指定的儲存路徑中,其主要目的是幫助使用者在升級和維護叢集過程中儲存系統中的狀態資料,避免因為停機運維或者升級應用等正常終止應用的操作而導致系統無法恢復到原有的計算狀態的情況,從而無法實現從端到端的 Excatly-Once 語義保證。
Operator ID 配置
當使用 Savepoints 對整個叢集進行升級或運維操作的時候,需要停止整個 Flink 應用程式,此時使用者可能會對應用的程式碼邏輯進行修改,即時 Flink 能夠通過 Savepoint 將應用中的狀態資料同步到磁碟然後恢復任務,但由於程式碼邏輯發生了變化,在升級過程中有可能導致運算元的狀態無法通過 Savepoints 中的資料恢復的情況,在這種情況下就需要通過唯一的 ID 標記運算元。在Flink中預設支援自動生成 Operator ID, 但是這種方式不利於對程式碼層面的維護和升級,建議使用者儘可能使用手工方式對運算元進行唯一 ID 標記, ID 的應用範圍在每個運算元內部,具體的使用方式如下:
environment.addSource(new SourceFunction[] {})
.uid("source-id")
.shuffle()
.map(new MapFunction[] {})
.uid("map-id")
.print()
Savepoints 操作
Savepoint 操作可以通過命令列的方式進行觸發,命令列提供了取消任務,從Savepoints中恢復任務,撤銷 Savepoints 等操作,在 Flink1.2 中以後也可以通過FlinkWeb頁面從 Savepoints中恢復應用。
手動觸發 Savepoints
bin/flink savepoint :jobId [:targetDirectory]
bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
取消任務並處觸發Savepoints
bin/flink cancel -s [:targetDirectory] :jobId
通過Savepoints中恢復任務
bin/flink run -s :savepointPath [:runArgs]
釋放Savepoints資料
bin/flink savepoint -d :savepointPath
通過 --dispose (-d) 命令釋放已經儲存的 Savepoint 資料,這樣儲存在指定路徑中的 savepointPath 將會被清理掉
TargetDirectory 配置
TargetDirectory配置
state.savepoints.dir: hdfs:///flink/savepoints
TargetDirectory 檔案目錄
# 檢視 TargetDirectory 檔案目錄
hdfs dfs -ls /flink/flink-savepoints/savepoint-11bbc5-bd967f90709b
狀態管理器
在Flink 中提供了 StateBackend 來儲存和管理 Checkpoints 過程中的狀態資料。
StateBackend 型別
Flink中一共實現了三種類型的狀態管理器,包括基於記憶體的MemoryStateBackend
、基於檔案系統的 FsStateBackend
, 以及基於 RockDB 作為儲存介質的 RockDBStateBackend
.
MemoryStateBackend
基於記憶體的狀態管理器將狀態資料全部儲存在JVM堆記憶體中,包括使用者在使用 DataStream API 中建立 Key/Value State,視窗中快取的狀態資料,以及觸發器等資料基於記憶體的狀態管理器具有非常快速和高校的特點,但也有非常多的限制,最主要的就是記憶體的容量限制,一旦儲存的狀態資料過多就會導致系統記憶體溢位,從而影響整個應用的正常執行。同時如果機器出現問題,整個主機記憶體中的狀態資料都會丟失,進而無法恢復任務中得玩狀態資料。因此這個玩意,避免使用。
Flink 將MemoryStateBackend 作為預設的狀態後端管理器,也可以通過如下引數配置初始化 MemoryStateBackend , 其中 "MAX_MEN_STATE_SIZE" 指定每個狀態值的記憶體使用大小。
new MemoryStateBackend(MAX_MEN_STATE_SIZE , false)
在Flink 中 MemoryStateBackend 具有如下特點:
- 聚合類運算元的狀態會儲存在 JobManager 記憶體中,因此對於聚合類運算元比較多的應用會對 JobManager 記憶體有一定的壓力,進而對整個叢集會造成較大的負擔
- 建立MemoryStateBackend時可以指定狀態初始化記憶體大小,但狀態資料傳輸大小會受限於Akka框架通訊的“akka.framesize” 大小限制(預設: 10485760 bit -> 1024 * 1024 * 10 )
- JVM記憶體容量受限於主機記憶體大小,也就是說不管是 JobManager 記憶體還是在 TaskManager 的記憶體中維護狀態資料都有記憶體的限制,因此對於非常大的狀態資料不適合使用 MemoryStateBackend 去儲存
important MemoryStateBackend 比較適合測試環境,並用於本地除錯和驗證,不建議在生產環境中使用。
FsStateBackend
與MemoryStateBackend 有所不同,FsStateBackend 是基於檔案系統的一種狀態管理器在,這裡的檔案系統可以是本地檔案系統,也可以是HDFS分散式檔案系統。
new FsStateBackend(path , false)
FsStateBackend 的 Boolean 引數型別指定是否以同步的方式記錄狀態資料,預設採用非同步方式。非同步方式可以儘可能避免在Checkpoint過程中影響流式計算任務
RockDBStateBackend
RockDBStateBackend 是Flink 中內建的第三方狀態管理器,和前面的狀態管理器不同,RocksDBStateBackend 需要單獨引入相關的依賴包到工程中,通過初始化 RockDBStateBackend 類,使可以得到 RockDBStateBackend 例項類。
RocksDBStateBackend 採用非同步的方式進行狀態資料的 Snapshot ,任務中的狀態資料首先被寫入 RockDB中,然後再非同步的將狀態資料寫入檔案系統中,這樣RockDB僅會儲存在正在進行的計算的資料,對於長時間才更新的資料則寫入磁碟中進行儲存,而對於體量比較小的元資料狀態,則儲存在 JobManager 記憶體中。
與 FsStateBackend 相比,RockDBStateBackend效能更高,主要是因為藉助了 RockDB 儲存了最新最熱的資料,然後通過非同步的方式在同步到檔案系統中