1. 程式人生 > 其它 >Flink狀態後端

Flink狀態後端

技術標籤:菜鳥也學大資料Flink大資料flink

什麼是狀態後端?

  • 每傳入一條資料,有狀態的運算元任務都會讀取和更新狀態
  • 由於有效的狀態訪問對於處理資料的低延遲至關重要,因此每個並行任務都會在本地維護其狀態,以確保快速的狀態訪問
  • 狀態的儲存、訪問以及維護,由一個可插入的元件決定,這個元件就叫做狀態後端(state backend)
  • 狀態後端主要負責兩件事:本地的狀態管理,以及將檢查點(checkpoint)狀態寫入遠端儲存

狀態後端的儲存模式

MemoryStateBackend

  • 記憶體級的狀態後端,會將鍵控狀態作為記憶體中的物件進行管理,將它們儲存在TaskManager的JVM堆上,而將checkpoint儲存在JobManager的記憶體中
  • 特點:快速、低延遲,但不穩定
  • 原始碼
public MemoryStateBackend() {this(null, null, DEFAULT_MAX_STATE_SIZE, TernaryBoolean.UNDEFINED);}
  • 示例
	val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	//配置使用MemoryStateBackend
    env.setStateBackend(new MemoryStateBackend)

FsStateBackend

  • 將checkpoint 存到遠端的持久化檔案系統(FileSystem) 上,而對於本地狀態,跟MemoryStateBackend - -樣,也會存在TaskManager的JVM堆上
  • 同時擁有記憶體級的本地訪問速度, 和更好的容錯保證
  • 原始碼
	public FsStateBackend(String checkpointDataUri) {this(new Path(checkpointDataUri));}
  • 示例
	val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	//配置使用FsStateBackend
    env.setStateBackend(new FsStateBackend("儲存路徑"))

RocksDBStateBackend

  • 將所有狀態序列化後,存入本地的RocksDB中儲存。

RocksDBStateBackend比較特殊,如果需要使用,需要新增依賴:
根據自己的使用的scala和flink版本進行修改
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.1</version>
</dependency>

  • 原始碼
public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {	
		this((new Path(checkpointDataUri)).toUri(), enableIncrementalCheckpointing);
    }
  • 示例
	val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	//RocksDBStateBackend除了配置儲存路徑,還需要配置是否增量儲存,否則就是全量儲存
    env.setStateBackend(new RocksDBStateBackend("儲存路徑",true))