Flink(九)【Flink的重啟策略】
阿新 • • 發佈:2021-07-30
目錄
1.Flink的重啟策略
Flink支援不同的重啟策略,這些重啟策略控制著job失敗後如何重啟。叢集可以通過預設的重啟策略來重啟,這個預設的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時候指定了重啟策略,這個重啟策略就會覆蓋掉叢集的預設重啟策略。
2.重啟策略
2.1未開啟checkpoint
未開啟checkpoint,任務失敗不會進行重啟,job直接失敗。
2.2開啟checkpoint
1)不設定重啟策略
預設是固定延遲重啟。job任務會一直重啟,不會掛,預設重啟Integer.MAX_VALUE
次 ,每次間隔1s
flink-conf.yaml 配置
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: Integer.MAX_VALUE
restart-strategy.fixed-delay.delay: 1s
2)不重啟
flink-conf.yaml 配置
restart-strategy: none
java程式碼
env.setRestartStrategy(RestartStrategies.noRestart());
3)固定延遲重啟(預設)
一旦有失敗,系統就會嘗試每10秒重啟一次,重啟3次, 3次都失敗該job失敗
flink-conf.yaml 配置
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
java程式碼
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
4)失敗率重啟
5分鐘內若失敗了3次則認為該job失敗,重試間隔為10s
flink-conf.yaml 配置
restart-strategy:failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
java程式碼
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)));
3.重啟效果演示
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.concurrent.TimeUnit;
/**
* @description: todo 測試Flink重啟策略
* @author: HaoWu
* @create: 2021年06月22日
*/
public class RestartTest {
public static void main(String[] args) throws Exception {
// TODO 1.建立執行環境
// 1.1 建立stream執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 設定並行度
env.setParallelism(4);
// 1.3 設定checkpoint引數
env.enableCheckpointing(5000L); //每5000ms做一次ck
env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時時間:1min
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,預設:exactly_once
//正常Cancel任務時,保留最後一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重啟策略
//env.setRestartStrategy(RestartStrategies.noRestart());
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)));
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//狀態後端:
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app_restart_test"));
// 訪問hdfs訪問許可權問題
// 報錯異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
// 解決:/根目錄沒有寫許可權 解決方案1.hadoop fs -chown 777 / 2.System.setProperty("HADOOP_USER_NAME", "atguigu");
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 2.獲取kafka的ods層業務資料:ods_basic_db
String ods_db_topic = "ods_base_db";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer_test", "false", "latest");
DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
jsonStrDS.print("轉換前>>>>");
// TODO 3.對jsonStrDS結構轉換
SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(new MapFunction<String, JSONObject>() {
@Override
public JSONObject map(String jsonStr) throws Exception {
//TODO 模擬程式異常
System.out.println(5 / 0);
return JSON.parseObject(jsonStr);
}
});
jsonDS.print("轉換後>>>>");
// TODO 4. 執行
env.execute();
}
}