Flink狀態後端
阿新 • • 發佈:2021-01-07
什麼是狀態後端?
- 每傳入一條資料,有狀態的運算元任務都會讀取和更新狀態
- 由於有效的狀態訪問對於處理資料的低延遲至關重要,因此每個並行任務都會在本地維護其狀態,以確保快速的狀態訪問
- 狀態的儲存、訪問以及維護,由一個可插入的元件決定,這個元件就叫做狀態後端(state backend)
- 狀態後端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠端儲存
狀態後端的儲存模式
MemoryStateBackend
- 記憶體級的狀態後端,會將鍵控狀態作為記憶體中的物件進行管理,將它們儲存在TaskManager的JVM堆上,而將checkpoint儲存在JobManager的記憶體中
- 特點:快速、低延遲,但不穩定
- 原始碼
public MemoryStateBackend() {this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);}
- 示例
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//配置使用MemoryStateBackend
env.setStateBackend(new MemoryStateBackend)
FsStateBackend
- 將checkpoint 存到遠端的持久化檔案系統(FileSystem) 上,而對於本地狀態,跟MemoryStateBackend - -樣,也會存在TaskManager的JVM堆上
- 同時擁有記憶體級的本地訪問速度, 和更好的容錯保證
- 原始碼
public FsStateBackend(String checkpointDataUri) {this(new Path(checkpointDataUri));}
- 示例
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//配置使用FsStateBackend
env.setStateBackend(new FsStateBackend("儲存路徑"))
RocksDBStateBackend
- 將所有狀態序列化後,存入本地的RocksDB中儲存。
RocksDBStateBackend比較特殊,如果需要使用,需要新增依賴:
根據自己的使用的scala和flink版本進行修改
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.1</version>
</dependency>
- 原始碼
public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
this((new Path(checkpointDataUri)).toUri(), enableIncrementalCheckpointing);
}
- 示例
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//RocksDBStateBackend除了配置儲存路徑,還需要配置是否增量儲存,否則就是全量儲存
env.setStateBackend(new RocksDBStateBackend("儲存路徑",true))