1. 程式人生 > 其它 >Flink Kafka source原始碼解讀

Flink Kafka source原始碼解讀

Flink 提供了專門的 Kafka 聯結器,向 Kafka topic 中讀取或者寫入資料。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 機制,可提供 exactly-once 的處理語義。為此,Flink 並不完全依賴於跟蹤 Kafka 消費組的偏移量,而是在內部跟蹤和檢查偏移量。本文內容較長,可以關注收藏。

引言

當我們在使用Spark Streaming、Flink等計算框架進行資料實時處理時,使用Kafka作為一款釋出與訂閱的訊息系統成為了標配。Spark Streaming與Flink都提供了相對應的Kafka Consumer,使用起來非常的方便,只需要設定一下Kafka的引數,然後新增kafka的source就萬事大吉了。如果你真的覺得事情就是如此的so easy,感覺媽媽再也不用擔心你的學習了,那就真的是too young too simple sometimes naive了。本文以Flink 的Kafka Source為討論物件,首先從基本的使用入手,然後深入原始碼逐一剖析,一併為你撥開Flink Kafka connector的神祕面紗。值得注意的是,本文假定讀者具備了Kafka的相關知識,關於Kafka的相關細節問題,不在本文的討論範圍之內。

Flink Kafka Consumer介紹

Flink Kafka Connector有很多個版本,可以根據你的kafka和Flink的版本選擇相應的包(maven artifact id)和類名。本文所涉及的Flink版本為1.10,Kafka的版本為2.3.4。Flink所提供的Maven依賴於類名如下表所示:

Demo示例

新增Maven依賴

<!--本文使用的是通用型的connector-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11
</artifactId> <version>1.10.0</version> </dependency>

簡單程式碼案例

public class KafkaConnector {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 開啟checkpoint,時間間隔為毫秒
senv.enableCheckpointing(5000L); // 選擇狀態後端 senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint")); //senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint")); Properties props = new Properties(); // kafka broker地址 props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092"); // 僅kafka0.8版本需要配置 props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181"); // 消費者組 props.put("group.id", "test"); // 自動偏移量提交 props.put("enable.auto.commit", true); // 偏移量提交的時間間隔,毫秒 props.put("auto.commit.interval.ms", 5000); // kafka 訊息的key序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // kafka 訊息的value序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定kafka的消費者從哪裡開始消費資料 // 共有三種方式, // #earliest // 當各分割槽下有已提交的offset時,從提交的offset開始消費; // 無提交的offset時,從頭開始消費 // #latest // 當各分割槽下有已提交的offset時,從提交的offset開始消費; // 無提交的offset時,消費新產生的該分割槽下的資料 // #none // topic各分割槽都存在已提交的offset時, // 從offset後開始消費; // 只要有一個分割槽不存在已提交的offset,則丟擲異常 props.put("auto.offset.reset", "latest"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( "qfbap_ods.code_city", new SimpleStringSchema(), props); //設定checkpoint後在提交offset,即oncheckpoint模式 // 該值預設為true, consumer.setCommitOffsetsOnCheckpoints(true); // 最早的資料開始消費 // 該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。 //consumer.setStartFromEarliest(); // 消費者組最近一次提交的偏移量,預設。 // 如果找不到分割槽的偏移量,那麼將會使用配置中的 auto.offset.reset 設定 //consumer.setStartFromGroupOffsets(); // 最新的資料開始消費 // 該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。 //consumer.setStartFromLatest(); // 指定具體的偏移量時間戳,毫秒 // 對於每個分割槽,其時間戳大於或等於指定時間戳的記錄將用作起始位置。 // 如果一個分割槽的最新記錄早於指定的時間戳,則只從最新記錄讀取該分割槽資料。 // 在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。 //consumer.setStartFromTimestamp(1585047859000L); // 為每個分割槽指定偏移量 /*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L); specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L); specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L); consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/ /** * * 請注意:當 Job 從故障中自動恢復或使用 savepoint 手動恢復時, * 這些起始位置配置方法不會影響消費的起始位置。 * 在恢復時,每個 Kafka 分割槽的起始位置由儲存在 savepoint 或 checkpoint 中的 offset 確定 * */ DataStreamSource<String> source = senv.addSource(consumer); // TODO source.print(); senv.execute("test kafka connector"); } }

引數配置解讀

在Demo示例中,給出了詳細的配置資訊,下面將對上面的引數配置進行逐一分析。

kakfa的properties引數配置

  • bootstrap.servers:kafka broker地址

  • zookeeper.connect:僅kafka0.8版本需要配置

  • group.id:消費者組

  • enable.auto.commit

    自動偏移量提交,該值的配置不是最終的偏移量提交模式,需要考慮使用者是否開啟了checkpoint,

    在下面的原始碼分析中會進行解讀

  • auto.commit.interval.ms:偏移量提交的時間間隔,毫秒

  • key.deserializer:

    kafka 訊息的key序列化器,如果不指定會使用ByteArrayDeserializer序列化器

  • value.deserializer

kafka 訊息的value序列化器,如果不指定會使用ByteArrayDeserializer序列化器

  • auto.offset.reset

    指定kafka的消費者從哪裡開始消費資料,共有三種方式,

  • 第一種:earliest
    當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

  • 第二種:latest
    當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料

  • 第三種:none
    topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常

    注意:上面的指定消費模式並不是最終的消費模式,取決於使用者在Flink程式中配置的消費模式

Flink程式使用者配置的引數

  • consumer.setCommitOffsetsOnCheckpoints(true)

解釋:設定checkpoint後在提交offset,即oncheckpoint模式,該值預設為true,該引數會影響偏移量的提交方式,下面的原始碼中會進行分析

  • consumer.setStartFromEarliest()

    解釋:最早的資料開始消費 ,該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。該方法為繼承父類FlinkKafkaConsumerBase的方法。

  • consumer.setStartFromGroupOffsets()

    解釋:消費者組最近一次提交的偏移量,預設。如果找不到分割槽的偏移量,那麼將會使用配置中的 auto.offset.reset 設定,該方法為繼承父類FlinkKafkaConsumerBase的方法。

  • consumer.setStartFromLatest()

    解釋:最新的資料開始消費,該模式下,Kafka 中的 committed offset 將被忽略,不會用作起始位置。該方法為繼承父類FlinkKafkaConsumerBase的方法。

  • consumer.setStartFromTimestamp(1585047859000L)

    解釋:指定具體的偏移量時間戳,毫秒。對於每個分割槽,其時間戳大於或等於指定時間戳的記錄將用作起始位置。如果一個分割槽的最新記錄早於指定的時間戳,則只從最新記錄讀取該分割槽資料。在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。

  • consumer.setStartFromSpecificOffsets(specificStartOffsets)

解釋:為每個分割槽指定偏移量,該方法為繼承父類FlinkKafkaConsumerBase的方法。

請注意:當 Job 從故障中自動恢復或使用 savepoint 手動恢復時,這些起始位置配置方法不會影響消費的起始位置。在恢復時,每個 Kafka 分割槽的起始位置由儲存在 savepoint 或 checkpoint 中的 offset 確定。

Flink Kafka Consumer原始碼解讀

繼承關係

Flink Kafka Consumer繼承了FlinkKafkaConsumerBase抽象類,而FlinkKafkaConsumerBase抽象類又繼承了RichParallelSourceFunction,所以要實現一個自定義的source時,有兩種實現方式:一種是通過實現SourceFunction介面來自定義並行度為1的資料來源;另一種是通過實現ParallelSourceFunction介面或者繼承RichParallelSourceFunction來自定義具有並行度的資料來源。FlinkKafkaConsumer的繼承關係如下圖所示。

原始碼解讀

FlinkKafkaConsumer原始碼

先看一下FlinkKafkaConsumer的原始碼,為了方面閱讀,本文將盡量給出本比較完整的原始碼片段,具體如下所示:程式碼較長,在這裡可以先有有一個總體的印象,下面會對重要的程式碼片段詳細進行分析。

public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {

    // 配置輪詢超時超時時間,使用flink.poll-timeout引數在properties進行配置
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    // 如果沒有可用資料,則等待輪詢所需的時間(以毫秒為單位)。 如果為0,則立即返回所有可用的記錄
    //預設輪詢超時時間
    public static final long DEFAULT_POLL_TIMEOUT = 100L;
    // 使用者提供的kafka 引數配置
    protected final Properties properties;
    // 如果沒有可用資料,則等待輪詢所需的時間(以毫秒為單位)。 如果為0,則立即返回所有可用的記錄
    protected final long pollTimeout;
    /**
     * 建立一個kafka的consumer source
     * @param topic                   消費的主題名稱
     * @param valueDeserializer       反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件
     * @param props                   使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }
    /**
     * 建立一個kafka的consumer source
     * 該構造方法允許傳入KafkaDeserializationSchema,該反序列化類支援訪問kafka消費的額外資訊
     * 比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param topic                消費的主題名稱
     * @param deserializer         反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件
     * @param props                使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }
    /**
     * 建立一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支援消費多個主題
     * @param topics          消費的主題名稱,多個主題為List集合
     * @param deserializer    反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件
     * @param props           使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
    }
    /**
     * 建立一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支援消費多個主題,
     * @param topics         消費的主題名稱,多個主題為List集合
     * @param deserializer   反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件,支援獲取額外資訊
     * @param props          使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(topics, null, deserializer, props);
    }
    /**
     * 基於正則表示式訂閱多個topic
     * 如果開啟了分割槽發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被建立就會立即被訂閱
     * @param subscriptionPattern   主題的正則表示式
     * @param valueDeserializer   反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件,支援獲取額外資訊
     * @param props               使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
    }
    /**
     * 基於正則表示式訂閱多個topic
     * 如果開啟了分割槽發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被建立就會立即被訂閱
     * @param subscriptionPattern   主題的正則表示式
     * @param deserializer          該反序列化類支援訪問kafka消費的額外資訊,比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param props                 使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(null, subscriptionPattern, deserializer, props);
    }
    private FlinkKafkaConsumer(
        List<String> topics,
        Pattern subscriptionPattern,
        KafkaDeserializationSchema<T> deserializer,
        Properties props) {
        // 呼叫父類(FlinkKafkaConsumerBase)構造方法,PropertiesUtil.getLong方法第一個引數為Properties,第二個引數為key,第三個引數為value預設值
        super(
            topics,
            subscriptionPattern,
            deserializer,
            getLong(
                checkNotNull(props, "props"),
                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
            !getBoolean(props, KEY_DISABLE_METRICS, false));

        this.properties = props;
        setDeserializer(this.properties);

        // 配置輪詢超時時間,如果在properties中配置了KEY_POLL_TIMEOUT引數,則返回具體的配置值,否則返回預設值DEFAULT_POLL_TIMEOUT
        try {
            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }
   // 父類(FlinkKafkaConsumerBase)方法重寫,該方法的作用是返回一個fetcher例項,
    // fetcher的作用是連線kafka的broker,拉去資料並進行反序列化,然後將資料輸出為資料流(data stream)
    @Override
    protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {
        // 確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交
        // 該方法為父類(FlinkKafkaConsumerBase)的靜態方法
        // 這將覆蓋使用者在properties中配置的任何設定
        // 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將使用者配置的properties屬性進行覆蓋
        // 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false
        // 可以理解為:如果開啟了checkpoint,並且設定了consumer.setCommitOffsetsOnCheckpoints(true),預設為true,
        // 就會將kafka properties的enable.auto.commit強制置為false
        adjustAutoCommitConfig(properties, offsetCommitMode);
        return new KafkaFetcher<>(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics);
    }
    //父類(FlinkKafkaConsumerBase)方法重寫
    // 返回一個分割槽發現類,分割槽發現可以使用kafka broker的高階consumer API發現topic和partition的元資料
    @Override
    protected AbstractPartitionDiscoverer createPartitionDiscoverer(
        KafkaTopicsDescriptor topicsDescriptor,
        int indexOfThisSubtask,
        int numParallelSubtasks) {

        return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
    }

    /**
     *判斷是否在kafka的引數開啟了自動提交,即enable.auto.commit=true,
     * 並且auto.commit.interval.ms>0,
     * 注意:如果沒有沒有設定enable.auto.commit的引數,則預設為true
     *       如果沒有設定auto.commit.interval.ms的引數,則預設為5000毫秒
     * @return
     */
    @Override
    protected boolean getIsAutoCommitEnabled() {
        //
        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
    }

    /**
     * 確保配置了kafka訊息的key與value的反序列化方式,
     * 如果沒有配置,則使用ByteArrayDeserializer序列化器,
     * 該類的deserialize方法是直接將資料進行return,未做任何處理
     * @param props
     */
    private static void setDeserializer(Properties props) {
        final String deSerName = ByteArrayDeserializer.class.getName();

        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
    }
}
分析

上面的程式碼已經給出了非常詳細的註釋,下面將對比較關鍵的部分進行分析。

  • 構造方法分析

FlinkKakfaConsumer提供了7種構造方法,如上圖所示。不同的構造方法分別具有不同的功能,通過傳遞的引數也可以大致分析出每種構造方法特有的功能,為了方便理解,本文將對其進行分組討論,具體如下:

單topic

/**
     * 建立一個kafka的consumer source
     * @param topic                   消費的主題名稱
     * @param valueDeserializer       反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件
     * @param props                   使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }

/**
     * 建立一個kafka的consumer source
     * 該構造方法允許傳入KafkaDeserializationSchema,該反序列化類支援訪問kafka消費的額外資訊
     * 比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param topic                消費的主題名稱
     * @param deserializer         反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件
     * @param props                使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }
上面兩種構造方法只支援單個topic,區別在於反序列化的方式不一樣。第一種使用的是DeserializationSchema,第二種使用的是KafkaDeserializationSchema,其中使用帶有KafkaDeserializationSchema引數的構造方法可以獲取更多的附屬資訊,比如在某些場景下需要獲取key/value對,offsets(偏移量),topic(主題名稱)等資訊,可以選擇使用此方式的構造方法。以上兩種方法都呼叫了私有的構造方法,私有構造方法的分析見下面。

多topic

/**
     * 建立一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支援消費多個主題
     * @param topics          消費的主題名稱,多個主題為List集合
     * @param deserializer    反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件
     * @param props           使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
    }
    /**
     * 建立一個kafka的consumer source
     * 該構造方法允許傳入多個topic(主題),支援消費多個主題,
     * @param topics         消費的主題名稱,多個主題為List集合
     * @param deserializer   反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件,支援獲取額外資訊
     * @param props          使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(topics, null, deserializer, props);
    }
上面的兩種多topic的構造方法,可以使用一個list集合接收多個topic進行消費,區別在於反序列化的方式不一樣。第一種使用的是DeserializationSchema,第二種使用的是KafkaDeserializationSchema,其中使用帶有KafkaDeserializationSchema引數的構造方法可以獲取更多的附屬資訊,比如在某些場景下需要獲取key/value對,offsets(偏移量),topic(主題名稱)等資訊,可以選擇使用此方式的構造方法。以上兩種方法都呼叫了私有的構造方法,私有構造方法的分析見下面。

正則匹配topic

/**
     * 基於正則表示式訂閱多個topic
     * 如果開啟了分割槽發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被建立就會立即被訂閱
     * @param subscriptionPattern   主題的正則表示式
     * @param valueDeserializer   反序列化型別,用於將kafka的位元組訊息轉換為Flink的物件,支援獲取額外資訊
     * @param props               使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
    }
    /**
     * 基於正則表示式訂閱多個topic
     * 如果開啟了分割槽發現,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值為非負數
     * 只要是能夠正則匹配上,主題一旦被建立就會立即被訂閱
     * @param subscriptionPattern   主題的正則表示式
     * @param deserializer          該反序列化類支援訪問kafka消費的額外資訊,比如:key/value對,offsets(偏移量),topic(主題名稱)
     * @param props                 使用者傳入的kafka引數
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
        this(null, subscriptionPattern, deserializer, props);
    }
實際的生產環境中可能有這樣一些需求,比如有一個flink作業需要將多種不同的資料聚合到一起,而這些資料對應著不同的kafka topic,隨著業務增長,新增一類資料,同時新增了一個kafka topic,如何在不重啟作業的情況下作業自動感知新的topic。首先需要在構建FlinkKafkaConsumer時的properties中設定flink.partition-discovery.interval-millis引數為非負值,表示開啟動態發現的開關,以及設定的時間間隔。此時FLinkKafkaConsumer內部會啟動一個單獨的執行緒定期去kafka獲取最新的meta資訊。具體的呼叫執行資訊,參見下面的私有構造方法

私有構造方法

private FlinkKafkaConsumer(
        List<String> topics,
        Pattern subscriptionPattern,
        KafkaDeserializationSchema<T> deserializer,
        Properties props) {

        // 呼叫父類(FlinkKafkaConsumerBase)構造方法,PropertiesUtil.getLong方法第一個引數為Properties,第二個引數為key,第三個引數為value預設值。KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值是開啟分割槽發現的配置引數,在properties裡面配置flink.partition-discovery.interval-millis=5000(大於0的數),如果沒有配置則使用PARTITION_DISCOVERY_DISABLED=Long.MIN_VALUE(表示禁用分割槽發現)
        super(
            topics,
            subscriptionPattern,
            deserializer,
            getLong(
                checkNotNull(props, "props"),
                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
            !getBoolean(props, KEY_DISABLE_METRICS, false));

        this.properties = props;
        setDeserializer(this.properties);

        // 配置輪詢超時時間,如果在properties中配置了KEY_POLL_TIMEOUT引數,則返回具體的配置值,否則返回預設值DEFAULT_POLL_TIMEOUT
        try {
            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }
  • 其他方法分析

KafkaFetcher物件建立

// 父類(FlinkKafkaConsumerBase)方法重寫,該方法的作用是返回一個fetcher例項,
    // fetcher的作用是連線kafka的broker,拉去資料並進行反序列化,然後將資料輸出為資料流(data stream)
    @Override
    protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {
        // 確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交
        // 該方法為父類(FlinkKafkaConsumerBase)的靜態方法
        // 這將覆蓋使用者在properties中配置的任何設定
        // 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將使用者配置的properties屬性進行覆蓋
        // 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false
        // 可以理解為:如果開啟了checkpoint,並且設定了consumer.setCommitOffsetsOnCheckpoints(true),預設為true,
        // 就會將kafka properties的enable.auto.commit強制置為false
        adjustAutoCommitConfig(properties, offsetCommitMode);
        return new KafkaFetcher<>(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics);
    }
該方法的作用是返回一個fetcher例項,fetcher的作用是連線kafka的broker,拉去資料並進行反序列化,然後將資料輸出為資料流(data stream),在這裡對自動偏移量提交模式進行了強制調整,即確保當偏移量的提交模式為ON_CHECKPOINTS(條件1:開啟checkpoint,條件2:consumer.setCommitOffsetsOnCheckpoints(true))時,禁用自動提交。這將覆蓋使用者在properties中配置的任何設定,簡單可以理解為:如果開啟了checkpoint,並且設定了consumer.setCommitOffsetsOnCheckpoints(true),預設為true,就會將kafka properties的enable.auto.commit強制置為false。關於offset的提交模式,見下文的偏移量提交模式分析。

判斷是否設定了自動提交

@Override
    protected boolean getIsAutoCommitEnabled() {
        //
        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
    }
判斷是否在kafka的引數開啟了自動提交,即enable.auto.commit=true,並且auto.commit.interval.ms>0, 注意:如果沒有沒有設定enable.auto.commit的引數,則預設為true, 如果沒有設定auto.commit.interval.ms的引數,則預設為5000毫秒。該方法會在FlinkKafkaConsumerBase的open方法進行初始化的時候呼叫。

反序列化

private static void setDeserializer(Properties props) {
         // 預設的反序列化方式 
        final String deSerName = ByteArrayDeserializer.class.getName();
         //獲取使用者配置的properties關於key與value的反序列化模式
        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
         // 如果配置了,則使用使用者配置的值
        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        // 沒有配置,則使用ByteArrayDeserializer進行反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
    }

確保配置了kafka訊息的key與value的反序列化方式,如果沒有配置,則使用ByteArrayDeserializer序列化器,
ByteArrayDeserializer類的deserialize方法是直接將資料進行return,未做任何處理。

FlinkKafkaConsumerBase原始碼

@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction {

    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private boolean enableCommitOnCheckpoints = true;
    /**
     * 偏移量的提交模式,僅能通過在FlinkKafkaConsumerBase#open(Configuration)進行配置
     * 該值取決於使用者是否開啟了checkpoint

     */
    private OffsetCommitMode offsetCommitMode;
    /**
     * 配置從哪個位置開始消費kafka的訊息,
     * 預設為StartupMode#GROUP_OFFSETS,即從當前提交的偏移量開始消費
     */
    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    private Map<KafkaTopicPartition, Long> specificStartupOffsets;
    private Long startupOffsetsTimestamp;

    /**
     * 確保當偏移量的提交模式為ON_CHECKPOINTS時,禁用自動提交,
     * 這將覆蓋使用者在properties中配置的任何設定。
     * 當offset的模式為ON_CHECKPOINTS,或者為DISABLED時,會將使用者配置的properties屬性進行覆蓋
     * 具體是將ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置為"false,即禁用自動提交
     * @param properties       kafka配置的properties,會通過該方法進行覆蓋
     * @param offsetCommitMode    offset提交模式
     */
    static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        }
    }

    /**
     * 決定是否在開啟checkpoint時,在checkpoin之後提交偏移量,
     * 只有使用者配置了啟用checkpoint,該引數才會其作用
     * 如果沒有開啟checkpoint,則使用kafka的配置引數:enable.auto.commit
     * @param commitOnCheckpoints
     * @return
     */
    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
        this.enableCommitOnCheckpoints = commitOnCheckpoints;
        return this;
    }
    /**
     * 從最早的偏移量開始消費,
     *該模式下,Kafka 中的已經提交的偏移量將被忽略,不會用作起始位置。
     *可以通過consumer1.setStartFromEarliest()進行設定
     */
    public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
        this.startupMode = StartupMode.EARLIEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     * 從最新的資料開始消費,
     *  該模式下,Kafka 中的 已提交的偏移量將被忽略,不會用作起始位置。
     *
     */
    public FlinkKafkaConsumerBase<T> setStartFromLatest() {
        this.startupMode = StartupMode.LATEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     *指定具體的偏移量時間戳,毫秒
     *對於每個分割槽,其時間戳大於或等於指定時間戳的記錄將用作起始位置。
     * 如果一個分割槽的最新記錄早於指定的時間戳,則只從最新記錄讀取該分割槽資料。
     * 在這種模式下,Kafka 中的已提交 offset 將被忽略,不會用作起始位置。
     */
    protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
        checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");

        long currentTimestamp = System.currentTimeMillis();
        checkArgument(startupOffsetsTimestamp <= currentTimestamp,
            "Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);

        this.startupMode = StartupMode.TIMESTAMP;
        this.startupOffsetsTimestamp = startupOffsetsTimestamp;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     *
     * 從具體的消費者組最近提交的偏移量開始消費,為預設方式
     * 如果沒有發現分割槽的偏移量,使用auto.offset.reset引數配置的值
     * @return
     */
    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
        this.startupMode = StartupMode.GROUP_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     *為每個分割槽指定偏移量進行消費
     */
    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
        return this;
    }
    @Override
    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode
        // 決定偏移量的提交模式,
        // 第一個引數為是否開啟了自動提交,
        // 第二個引數為是否開啟了CommitOnCheckpoint模式
        // 第三個引數為是否開啟了checkpoint
        this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

       // 省略的程式碼
    }

// 省略的程式碼
    /**
     * 建立一個fetcher用於連線kafka的broker,拉去資料並進行反序列化,然後將資料輸出為資料流(data stream)
     * @param sourceContext   資料輸出的上下文
     * @param subscribedPartitionsToStartOffsets  當前sub task需要處理的topic分割槽集合,即topic的partition與offset的Map集合
     * @param watermarksPeriodic    可選,一個序列化的時間戳提取器,生成periodic型別的 watermark
     * @param watermarksPunctuated  可選,一個序列化的時間戳提取器,生成punctuated型別的 watermark
     * @param runtimeContext        task的runtime context上下文
     * @param offsetCommitMode      offset的提交模式,有三種,分別為:DISABLED(禁用偏移量自動提交),ON_CHECKPOINTS(僅僅當checkpoints完成之後,才提交偏移量給kafka)
     * KAFKA_PERIODIC(使用kafka自動提交函式,週期性自動提交偏移量)
     * @param kafkaMetricGroup   Flink的Metric
     * @param useMetrics         是否使用Metric
     * @return                   返回一個fetcher例項
     * @throws Exception
     */
    protected abstract AbstractFetcher<T, ?> createFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            StreamingRuntimeContext runtimeContext,
            OffsetCommitMode offsetCommitMode,
            MetricGroup kafkaMetricGroup,
            boolean useMetrics) throws Exception;
    protected abstract boolean getIsAutoCommitEnabled();
    // 省略的程式碼
}
上述程式碼是FlinkKafkaConsumerBase的部分程式碼片段,基本上對其做了詳細註釋,裡面的有些方法是FlinkKafkaConsumer繼承的,有些是重寫的。之所以在這裡給出,可以對照FlinkKafkaConsumer的原始碼,從而方便理解。

偏移量提交模式分析

Flink Kafka Consumer 允許有配置如何將 offset 提交回 Kafka broker(或 0.8 版本的 Zookeeper)的行為。請注意:Flink Kafka Consumer 不依賴於提交的 offset 來實現容錯保證。提交的 offset 只是一種方法,用於公開 consumer 的進度以便進行監控。

配置 offset 提交行為的方法是否相同,取決於是否為 job 啟用了 checkpointing。在這裡先給出提交模式的具體結論,下面會對兩種方式進行具體的分析。基本的結論為:

  • 開啟checkpoint

  • 情況1:使用者通過呼叫 consumer 上的 setCommitOffsetsOnCheckpoints(true) 方法來啟用 offset 的提交(預設情況下為 true )
    那麼當 checkpointing 完成時,Flink Kafka Consumer 將提交的 offset 儲存在 checkpoint 狀態中。
    這確保 Kafka broker 中提交的 offset 與 checkpoint 狀態中的 offset 一致。
    注意,在這個場景中,Properties 中的自動定期 offset 提交設定會被完全忽略。
    此情況使用的是ON_CHECKPOINTS

  • 情況2:使用者通過呼叫 consumer 上的 setCommitOffsetsOnCheckpoints("false") 方法來禁用 offset 的提交,則使用DISABLED模式提交offset

  • 未開啟checkpoint
    Flink Kafka Consumer 依賴於內部使用的 Kafka client 自動定期 offset 提交功能,因此,要禁用或啟用 offset 的提交

  • 情況1:配置了Kafka properties的引數配置了"enable.auto.commit" = "true"或者 Kafka 0.8 的 auto.commit.enable=true,使用KAFKA_PERIODIC模式提交offset,即自動提交offset

  • 情況2:沒有配置enable.auto.commit引數,使用DISABLED模式提交offset,這意味著kafka不知道當前的消費者組的消費者每次消費的偏移量。

提交模式原始碼分析

  • offset的提交模式

public enum OffsetCommitMode {
    // 禁用偏移量自動提交
    DISABLED,
    // 僅僅當checkpoints完成之後,才提交偏移量給kafka
    ON_CHECKPOINTS,
    // 使用kafka自動提交函式,週期性自動提交偏移量
    KAFKA_PERIODIC;
}
  • 提交模式的呼叫

public class OffsetCommitModes {
    public static OffsetCommitMode fromConfiguration(
            boolean enableAutoCommit,
            boolean enableCommitOnCheckpoint,
            boolean enableCheckpointing) {
        // 如果開啟了checkinpoint,執行下面判斷
        if (enableCheckpointing) {
            // 如果開啟了checkpoint,進一步判斷是否在checkpoin啟用時提交(setCommitOffsetsOnCheckpoints(true)),如果是則使用ON_CHECKPOINTS模式
            // 否則使用DISABLED模式
            return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
        } else {
            // 若Kafka properties的引數配置了"enable.auto.commit" = "true",則使用KAFKA_PERIODIC模式提交offset
            // 否則使用DISABLED模式
            return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
        }
    }
}
小結

本文主要介紹了Flink Kafka Consumer,首先對FlinkKafkaConsumer的不同版本進行了對比,然後給出了一個完整的Demo案例,並對案例的配置引數進行了詳細解釋,接著分析了FlinkKafkaConsumer的繼承關係,並分別對FlinkKafkaConsumer以及其父類FlinkKafkaConsumerBase的原始碼進行了解讀,最後從原始碼層面分析了Flink Kafka Consumer的偏移量提交模式,並對每一種提交模式進行了梳理。

原文連結:https://mp.weixin.qq.com/s?__biz=MzU2ODQ3NjYyMA==&mid=2247483841&idx=1&sn=9b63128b54f2519dc7d2a0f884947a23&scene=19#wechat_redirect