1. 程式人生 > 其它 >Flink(九)【Flink的重啟策略】

Flink(九)【Flink的重啟策略】

目錄

1.Flink的重啟策略

Flink支援不同的重啟策略,這些重啟策略控制著job失敗後如何重啟。叢集可以通過預設的重啟策略來重啟,這個預設的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時候指定了重啟策略,這個重啟策略就會覆蓋掉叢集的預設重啟策略。

2.重啟策略

2.1未開啟checkpoint

未開啟checkpoint,任務失敗不會進行重啟,job直接失敗。

2.2開啟checkpoint

1)不設定重啟策略

預設是固定延遲重啟。job任務會一直重啟,不會掛,預設重啟Integer.MAX_VALUE 次 ,每次間隔1s

flink-conf.yaml 配置

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: Integer.MAX_VALUE
restart-strategy.fixed-delay.delay: 1s
2)不重啟

flink-conf.yaml 配置

restart-strategy: none

java程式碼

env.setRestartStrategy(RestartStrategies.noRestart());
3)固定延遲重啟(預設)

一旦有失敗,系統就會嘗試每10秒重啟一次,重啟3次, 3次都失敗該job失敗

flink-conf.yaml 配置

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

java程式碼

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
4)失敗率重啟

5分鐘內若失敗了3次則認為該job失敗,重試間隔為10s

flink-conf.yaml 配置

restart-strategy:failure-rate

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

java程式碼

env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3,
                Time.of(5, TimeUnit.MINUTES),
                Time.of(10, TimeUnit.SECONDS)));

3.重啟效果演示

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.concurrent.TimeUnit;

/**
 * @description: todo 測試Flink重啟策略
 * @author: HaoWu
 * @create: 2021年06月22日
 */
public class RestartTest {
    public static void main(String[] args) throws Exception {
        // TODO 1.建立執行環境
        // 1.1 建立stream執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.2 設定並行度
        env.setParallelism(4);
        // 1.3 設定checkpoint引數
        env.enableCheckpointing(5000L); //每5000ms做一次ck
        env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時時間:1min
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,預設:exactly_once
        //正常Cancel任務時,保留最後一次CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //重啟策略
        //env.setRestartStrategy(RestartStrategies.noRestart());
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3,
                Time.of(5, TimeUnit.MINUTES),
                Time.of(10, TimeUnit.SECONDS)));
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //狀態後端:
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app_restart_test"));
        // 訪問hdfs訪問許可權問題
        // 報錯異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
        // 解決:/根目錄沒有寫許可權 解決方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
        System.setProperty("HADOOP_USER_NAME", "atguigu");

        // TODO 2.獲取kafka的ods層業務資料:ods_basic_db
        String ods_db_topic = "ods_base_db";
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer_test", "false", "latest");
        DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
        jsonStrDS.print("轉換前>>>>");
        // TODO 3.對jsonStrDS結構轉換
        SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String jsonStr) throws Exception {
                //TODO 模擬程式異常
                System.out.println(5 / 0);
                return JSON.parseObject(jsonStr);
            }
        });
        jsonDS.print("轉換後>>>>");
        // TODO 4. 執行
        env.execute();
    }
}