1. 程式人生 > >Flink系列之狀態及檢查點

Flink系列之狀態及檢查點

  Flink不同於其他實時計算的框架之處是它可以提供針對不同的狀態進行程式設計和計算。本篇文章的主要思路如下,大家可以選擇性閱讀。

  1. Flink的狀態分類及不同點。

  2. Flink針對不同的狀態進行程式設計。

  3. 檢查點機制和配置。

  4. 狀態的儲存。

  

  •  Flilnk的狀態分類及不同點

    Flink有兩種不同的狀態分類,一種是Keyed State(鍵狀態),一種是Operator State(運算元狀態)。

    • Keyed State

      主要是針對KeyedStream中使用,當使用keyBy方法的時候進行計算。 我們都知道在計算的過程中就是將Flink按照<並行operator, key> 進行計算,每個key又歸屬於單個Operator,所以我們可以簡單的理解為<operator, key>。也就是說首先按Operator分配到不同的例項,然後在不同的例項中,相同的Key分配到相同的組中,然後這些狀態就可以在相同的組中進行獲取和計算。

    • Operator State

      主要針對不同的運算元的狀態計算。按照不同的運算元如Map, FlatMap,Reduce等運算元去分配不同的例項群。像Kafka Connector的例子就很好的應用了這個功能,根據不同的topic去讀取不同的狀態,比如計算獲取到topic的paritition分割槽和 offset偏移量。 每個運算元例項會維護著這個topic的partition及offset的Map狀態,這個例子就是很好的使用了Opertator的state。如果Operator並行度發生改變了的話,那麼狀態也會相應的分配好對應的狀態。

 

  • 可管理的及原生狀態

  這兩種狀態又分為 Managed State (可管理狀態)和 Raw State (原生狀態)

    • Managed State : 可管理狀態就是自己去定義和編寫狀態處理的邏輯,全部由自己和Flink進行控制。
    • Raw State : 原生狀態也就是在Operator運算元觸發 checkPoint 檢查點的時候,Flink會在其資料結構中寫入一部分位元組碼Byte,Flink只能看到其中有一些碼,但是無法去進行控制。

  所有的流資料功能都可以使用Managed State,這個也是Flink程式設計所推薦的。因為要使用Raw State的話比較底層也比較複雜,要實現運算元方法時才使用。

 

  • Flink針對不同的狀態進行程式設計

  我們只針對可管理的狀態進行操作,不同的管理 Keyed State 和Operator State 狀態原始方法定義可參考官網介紹。

    • Keyed State

     我們針對Keyed managed state進行程式設計。來個場景,假如Flink計算某個功能的時間,如果某個功能Key時間超過某個閾值了則進行計數,如果資料超過了設定的次數,那麼直接輸出到控制檯。直接參考如下程式碼。

  程式碼大致的思路是:

  繼承RichFlatMapFunction, 定義一個ListState<Long>用於記錄當前的狀態。

  定義閾值和錯誤次數值,觸發後直接輸出控制檯下。

  open方法例項化ListState。在裡邊設定了一下狀態的TTL,即狀態的生命週期。

  flatMap方法按key分配後的value進行判斷和記錄。

  最後main方法進行資料準備和輸出。

package myflink.state;

import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {

    //通過ListState來儲存非正常資料的狀態
    private transient ListState<Long> abnormalData;
    //需要監控的閾值
    private Long threshold;
    //觸發報警的次數
    private Integer numberOfTimes;

    public ThresholdWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        ListStateDescriptor listStateDescriptor = new ListStateDescriptor<Long>("abnormal-state",
                TypeInformation.of(Long.class));

        //狀態存活生命週期設定TTL Time To Live
        StateTtlConfig  ttlConfig = StateTtlConfig
                //設定有效期為10秒
                .newBuilder(Time.seconds(10L))
                //設定有效的更新規則,當建立和寫入的時候需要重新更新為10S
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                //設定狀態的可見性,設定狀態如果沒有刪除,那麼就是可見的,另外一個值:ReturnExpiredIfNotCleanedUp ,
                // 如果沒有清理的話,狀態會一直可見的
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        //設定TTL配置
        listStateDescriptor.enableTimeToLive(ttlConfig);

        //通過狀態名稱(控制代碼)獲取狀態例項,如果不存在則會自動建
        abnormalData = getRuntimeContext().getListState(new ListStateDescriptor<Long>("abnormal-state",
                TypeInformation.of(Long.class)));
    }



    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
        Long inputValue = value.f1;
        //如果輸入的值超過閾值,則記錄該次不正常的資料資訊
        if(inputValue >= threshold) {
            abnormalData.add(inputValue);
        }

        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());

        //如果不正常的資料超過了指定的數量,則輸出報警資訊
        if(list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + " 超過指定閾值數量", list));
            //報警資訊輸出後,清空狀態
            abnormalData.clear();
        }
    }

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設定並行度為1,用於觀察輸出
//        env.setParallelism(1);

        DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
                Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));

        tuple2DataStreamSource
                .keyBy(0)
                //超過100的閾值3次後輸出報警資訊
                .flatMap(new ThresholdWarning(100L, 3))
                .printToErr();

        env.execute("Managed Keyed State");
    }
}

  輸出的結果如下,大於等於100的出現3次即進行輸出。和我們想象的都一樣。

1> (b 超過指定閾值數量,[100, 200, 200])
1> (b 超過指定閾值數量,[500, 600, 700])
3> (a 超過指定閾值數量,[400, 100, 200])
    •  operator state

  我們還在原來基礎的例子上調整一下,不按key,按Operator型別,只要超過時間的次數達到了就要輸出。在其中,把Operator的hashCode進行輸出一下,用於區分是否為相同的Operator。首先我們將並行度設定為1,然後一會兒再把並行度調整成2。

  程式碼的大致思路為:

  繼承RichFlatMapFunction,實現CheckpointedFunction介面,即在觸發檢查點的時候進行操作。

  initializeState方法的時候將opertor的狀態和檢查點狀態進行初始化。

  snapshotState方法即儲存狀態時將當時的映象進行儲存。可以儲存到外部裝置。

  flatMap的時候進行閾值判斷和資料收集。

  main方法進行檢查點設定,資料準備,執行和輸出。

package myflink.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

public class ThresholdOperatorWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String,
        List<Tuple2<String, Long>>>> implements CheckpointedFunction {


    //非正確資料狀態
    private List<Tuple2<String, Long>> bufferedData;

    //檢查點狀態
    private transient ListState<Tuple2<String, Long>> checkPointedState;

    //需要監控的閾值
    private Long threshold;
    //次數
    private Integer numberOfTimes;

    ThresholdOperatorWarning(Long threshold, Integer numberOfTimes) {
        this.threshold = threshold;
        this.numberOfTimes = numberOfTimes;
        this.bufferedData = new ArrayList<>();
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out)
            throws Exception {
        Long inputValue = value.f1;

        //超過閾值則進行記錄
        if(inputValue >= threshold) {
            bufferedData.add(value);
        }

        //超過指定次數則進行彙總和彙總輸出
        if(bufferedData.size() >= numberOfTimes) {
            //輸出狀態例項的hashCode
            out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值報警! " , bufferedData ));
            //清理快取
            bufferedData.clear();
        }

    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        //當資料進行快照時,將資料儲存到checkPointedState
        checkPointedState.clear();
        for (Tuple2<String, Long> element : bufferedData) {
            checkPointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        //這裡獲取的是operatorStateStore
        checkPointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Tuple2<String, Long>>(
                "abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
        ));

        //如果發生重啟,則需要從快照中將狀態進行恢復
        if(context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        //開啟檢查點
        env.enableCheckpointing(1000L);
        // 其他可選配置如下:

        // 設定語義,預設是EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 設定檢查點之間最小停頓時間
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 設定執行Checkpoint操作時的超時時間
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 設定最大併發執行的檢查點的數量
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 將檢查點持久化到外部儲存
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);



        //設定並行度為1
        env.setParallelism(1);
        //資料來源

        DataStreamSource<Tuple2<String, Long>>  tuple2DataStreamSource = env.fromElements(
                Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L)
        );

        tuple2DataStreamSource.flatMap(new ThresholdOperatorWarning(100L, 3))
                .printToErr();

        env.execute("managed Operator State");

    }

}

  當前並行度為1,結果如下,資料沒有按key統計,而是按照裡邊的值進行統計,符合我們的要求。因為是同一個Operator,所以hashcode是一樣的。

(1629838640閾值報警! ,[(a,400), (a,100), (a,200)])
(1629838640閾值報警! ,[(a,200), (b,100), (b,200)])
(1629838640閾值報警! ,[(b,200), (b,500), (b,600)])

  接下來將並行度設定為2,結果如下。我們看一下main裡邊的資料,符合大於等於100的資料一共有10個,那麼兩個不同的operator分配的時候這10資料的時候,一個operator分5個,那麼滿足超過3個的時候才收集並輸出。因為5個裡邊只有一組3個滿足,2個不滿足所以不會輸出,所以符合我們的預期。

1> (475161679閾值報警! ,[(a,100), (a,200), (b,200)])
2> (1633355453閾值報警! ,[(a,400), (a,200), (b,100)])
  • 檢查點的機制和配置
    • 檢查點的機制

      上邊我們程式裡邊設定了檢查點,檢查點是當資料進行處理的時候將資料的狀態進行記錄,當程式出現問題的時候方便恢復。

     可以像這樣的情況:  資料來源——>  123456789|12345678| 12341234|——>sink。|即檢查點,是一個checkpoint barrier,當運算元執行計算的時候會把當前的狀態進行記錄,比如讀取Kafka的資料,假如讀取到offset=6868,然後將這個值進行了記錄, 當這時有機器出現了問題,程式需要進行恢復並執行,那麼需要重新讀取這條資料再計算。引用一張圖片可以有更清楚的認識。

  

 

 

    •  檢查點的配置

      預設情況下,檢查點是關閉著的,我們需要明確開啟。其他的一些配置可參考如下內容:

     //開啟檢查點
        env.enableCheckpointing(1000L);
        // 其他可選配置如下:

        // 設定語義,預設是EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 設定檢查點之間最小停頓時間
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 設定執行Checkpoint操作時的超時時間
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 設定最大併發執行的檢查點的數量
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 將檢查點持久化到外部儲存
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    • 檢查點手工儲存

       flink支援手工將檢查點的狀態儲存到外部,也可以指定儲存到HDFS檔案。儲存到外邊是為了程式出現了問題時進行恢復,比如OOM問題。程序升級和重啟時也需要重新從檢查點進行恢復。

# 觸發指定id的作業的Savepoint,並將結果儲存到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]
  •  狀態的儲存

    Keyed State和Operator State會儲存在記憶體中,因為資料是持續不斷的輸入的,當資料量非常大的時候,記憶體會出現不足的情況,那麼我們也是需要將當前的狀態進行儲存的。官方稱為狀態後端。

    • Flink的狀態儲存支援3種方式

      MemoryStateBackend,這種方式是將資料儲存在JVM中,這種方式是用於開發。

      FsStateBackend, 即以檔案的形式儲存到磁碟中,可以是HDFS或本地檔案。當JobManger把任務傳送給Taskmanger進行計算,此時資料會在JVM中,當觸發了checkpoint後才會將資料儲存到檔案中。

      RocksDBStateBackend,這種形式是介於前邊兩種的情況,這個是將狀態資料到KV資料庫中,當觸發狀態的時候會將資料再持久化到檔案中。這樣即提高了速度,空間也變得更大了。

 

    • 狀態儲存配置

      預設情況是MemoryStateBackend,即記憶體中。

      剩下兩種的配置如下,這種方式只對當前Job有效。RocksDB配置的話需要額外引用一下包。

// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:60060/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:60060/flink/checkpoints"))

      通過修改flink-yaml.conf可以對該叢集所有作業生效。

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:60060/flink/checkpoints

 

     

MemoryStateBack