1. 程式人生 > 實用技巧 >flink checkpoint 在 window 操作下 全域性配置失效的問題

flink checkpoint 在 window 操作下 全域性配置失效的問題

背景

  • flink 版本號 1.6.2
  • flink 叢集模式 flink on yarn
  • 使用flink 讀取kafka 資料 簡單處理之後使用自定義richWindowFunction 處理資料的時候出現異常報錯:
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException
(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call
(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745
)
Caused by: java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size
=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend. at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145) at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126) at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759) at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more
  • flink 關於 checkpoint 配置 :
fs.default-scheme: hdfs://@hadoop:9000/
fs.hdfs.hadoopconf: hdfs:///flink/data/
state.checkpoints.dir: hdfs:///flink/checkpoints/
state.checkpoints.num-retained: 20
state.savepoints.dir: hdfs:///flink/flink-savepoints/
state.backend.fs.checkpoint.dir: hdfs:///flink/state/checkpoints/

疑惑點:

全域性設定 checkpoint 儲存地址 ,那麼window 操作的儲存地址 應該也是該位置 .
但是為什麼還是會將checkpoint 使用memory 方式?

嘗試解決辦法:

在程式碼層設定 checkpoint儲存模式:

env.setStateBackend(new
FsStateBackend("hdfs:///flink/checkpoints/workFlowCheckpoint"));

解決前後對比 :

解決後hdfs 目錄:

再次疑慮:

但是在1.6.2 版本 該類沒設定為Deprecated ,求問 :
我這個解決辦法是有什麼不準確的方式麼? 還是說 全域性設定checkpoint 對於window 自身並沒有生效?