1. 程式人生 > 其它 >自定義flink-kafka特定結束消費位置

自定義flink-kafka特定結束消費位置

背景:消費特定開始和結束位置的kafka資料,需求是執行flink任務消費完特定位置資料後,任務自行停止。但批任務並不支援消費kafka資料,而流任務不會自行停止,因此需要自定義kafka聯結器。flink1.14版本中,DataStream Connectors 有一個屬性setBounded,可以設定消費的結束位置,但Table API目前並不支援設定結束消費位置,正好可以模仿著DataStream修改原始碼,flink程式碼版本是1.14。

​本文主要參考這篇文章,這篇文章給我很大的幫助,在此基礎對一些細節地方進行了修改,其中修改了兩處比較關鍵的地方,最終滿足了需求。首先是修改流批任務判斷條件

,保證在批任務情況下,消費到kafka中的資料。其次保證任務消費到指定位置後任務停止。最後是進行打包測試,打包過程中注意格式,網路也會有一定的影響,後續也會將jar包放到後面,可直接使用。
flink-connector-kafka_2.11-1.14.4.jar

主要的修改地方

1,批模式處理流資料
在KafkaSourceBuilder中設定批標識Boundedness.BOUNDED,保證能做批任務情況下處理kfka流資料,這個標識也可在其他位置設定,根據自己需要進行設定。
2,設定結束偏移位置,仿照開始偏移位置設定結束偏移位置
在KafkaSourceBuilder新建setEndOffsets方法並給stoppingOffsetsInitializer屬性賦值
3,設定結束偏移位置的方式
在KafkaConnectorOptionsUtil裡面,仿照開始getStartupOptions方法新建getEndupOptions方法,針對特定偏移位置進行鍼對性修改,同樣還是仿照開始位置進行設定,具體修改請參照一下內容。

注意事項;

程式碼格式需要注意,換行不能少,空格都不能多,不然打包的時候無法通過。
還有就是import的時候也要注意,避免IDEA自動匯入的問題,不然打包也會失敗。操作步驟如下
1, File->settings->Editor->Code Style->java->imports
2, Class count to use import with '
' 值為100
3, Names count to use static import with '*' 值為100
之後就可以順利的將自定義jar包打包成功,之後直接替換本地專案中的flink-kafka-connector.jar,注意名字要完成匹配,這樣後續程式碼執行,使用的就是修改後的jar包。

具體修改如下:

下載flink原始碼1.14版本https://gitee.com/apache/flink.git(最好通過中文github網站進行下載,比較快),共有7個需要修改的地方:

1,KafkaSourceBuilder

設定結束消費位置和有限資料標誌
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java

public KafkaSourceBuilder<OUT> setEndOffsets(OffsetsInitializer stoppingOffsetsInitializer) {		
//這個地方設定結束偏移位置,是整個修改的核心
    this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
    this.boundedness = Boundedness.BOUNDED;//有限資料標誌,保證批任務處理kafka(流)資料
    return this;
}

仿照開始偏移位置,設定結束偏移位置,這裡之所以設定 this.boundedness = Boundedness.BOUNDED,是因為批任務並不支援消費kafka(流)型別資料,不設定會報如下錯誤:

Querying an unbounded table '%s' in batch mode is not allowed. "
        + "The table source is unbounded.

也可在其他方式設定該屬性,這個根據自己的需求可自行調整。

2,EndupMode

新增一個EndupMode配置檔案,也是仿照開始的配置檔案編寫
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EndupMode.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.streaming.connectors.kafka.config;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;

/** End modes for the Kafka Consumer. */
@Internal
public enum EndupMode {

    /** End from committed offsets in ZK / Kafka brokers of a specific consumer group (default). */
    GROUP_OFFSETS(KafkaTopicPartitionStateSentinel.GROUP_OFFSET),

    /** End from the latest offset. */
    LATEST(KafkaTopicPartitionStateSentinel.LATEST_OFFSET),

    /**
     * Start from user-supplied timestamp for each partition. Since this mode will have specific
     * offsets to start with, we do not need a sentinel value; using Long.MIN_VALUE as a
     * placeholder.
     */
    TIMESTAMP(Long.MIN_VALUE),

    /**
     * Start from user-supplied specific offsets for each partition. Since this mode will have
     * specific offsets to start with, we do not need a sentinel value; using Long.MIN_VALUE as a
     * placeholder.
     */
    SPECIFIC_OFFSETS(Long.MIN_VALUE);

    /** The sentinel offset value corresponding to this startup mode. */
    private long stateSentinel;

    EndupMode(long stateSentinel) {
        this.stateSentinel = stateSentinel;
    }
}

3, KafkaConnectorOptions

設定結束消費kafka的相關引數
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java

仿照開始配置編寫結束配置,這裡也很重要,後續呼叫的方式可根據此處編寫。

引數名 引數值
scan.startup.mode 可選值:'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
scan.startup.specific-offsets 指定每個分割槽的偏移量,比如:'partition:0,offset:42;partition:1,offset:300'
scan.startup.timestamp-millis 直接指定開始時間戳,long型別
scan.endup.mode 可選值:'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
scan.endup.specific-offsets 指定每個分割槽的偏移量,比如:'partition:0,offset:42;partition:1,offset:300'
scan.sendup.timestamp-millis 直接指定結束時間戳,long型別
public static final ConfigOption<ScanEndupMode> SCAN_ENDUP_MODE =
        ConfigOptions.key("scan.endup.mode")
                .enumType(ScanEndupMode.class)
                .defaultValue(ScanEndupMode.GROUP_OFFSETS)
                .withDescription("Endup mode for Kafka consumer.");

public static final ConfigOption<String> SCAN_ENDUP_SPECIFIC_OFFSETS =
        ConfigOptions.key("scan.endup.specific-offsets")
                .stringType()
                .noDefaultValue()
                .withDescription(
                        "Optional offsets used in case of \"specific-offsets\" endup mode");

public static final ConfigOption<Long> SCAN_ENDUP_TIMESTAMP_MILLIS =
        ConfigOptions.key("scan.endup.timestamp-millis")
                .longType()
                .noDefaultValue()
                .withDescription("Optional timestamp used in case of \"timestamp\" endup mode");
/** Endup mode for the Kafka consumer, see {@link #SCAN_ENDUP_MODE}. */
public enum ScanEndupMode implements DescribedEnum {
    LATEST_OFFSET("latest-offset", text("End from the latest offset.")),
    GROUP_OFFSETS(
            "group-offsets",
            text(
                    "End from committed offsets in ZooKeeper / Kafka brokers of a specific consumer group.")),
    TIMESTAMP("timestamp", text("End from user-supplied timestamp for each partition.")),
    SPECIFIC_OFFSETS(
            "specific-offsets",
            text("End from user-supplied specific offsets for each partition."));
    private final String value;
    private final InlineElement description;

    ScanEndupMode(String value, InlineElement description) {
        this.value = value;
        this.description = description;
    }
    @Override
    public String toString() {
        return value;
    }
    @Override
    public InlineElement getDescription() {
        return description;
    }
}

4,KafkaConnectorOptionsUtil

kafka結束消費位置,根據引數建立相關偏移量物件
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java

public static EndupOptions getEndupOptions(ReadableConfig tableOptions) {
    final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
    final EndupMode endupMode =
            tableOptions
                    .getOptional(KafkaConnectorOptions.SCAN_ENDUP_MODE)
                    .map(KafkaConnectorOptionsUtil::endfromOption)
                    .orElse(EndupMode.GROUP_OFFSETS);
  //這個地方需要注意一下,需要建立一個獲取結束偏移位置的方法
    if (endupMode == EndupMode.SPECIFIC_OFFSETS) {
        buildSpecificEndOffsets(tableOptions, tableOptions.get(TOPIC).get(0), specificOffsets);
    }
		//
    final EndupOptions options = new EndupOptions();
    options.endupMode = endupMode;
    options.specificOffsets = specificOffsets;
    if (endupMode == EndupMode.TIMESTAMP) {
        options.endupTimestampMillis = tableOptions.get(SCAN_ENDUP_TIMESTAMP_MILLIS);
    }
    return options;
}

private static void buildSpecificEndOffsets(
        ReadableConfig tableOptions,
        String topic,
        Map<KafkaTopicPartition, Long> specificOffsets) {
    String specificOffsetsStrOpt = tableOptions.get(SCAN_ENDUP_SPECIFIC_OFFSETS);
    final Map<Integer, Long> offsetMap =
            parseSpecificOffsets(specificOffsetsStrOpt, SCAN_ENDUP_SPECIFIC_OFFSETS.key());
    offsetMap.forEach(
            (partition, offset) -> {
                final KafkaTopicPartition topicPartition =
                        new KafkaTopicPartition(topic, partition);
                specificOffsets.put(topicPartition, offset);
            });
}

這個地方是仿照開始偏移位置進行編寫,這個其實很容易看出來,但是由於最開始不理解浪費了好長時間。仿照buildSpecificOffsets 進行編寫,根據開始SCAN_STARTUP_SPECIFIC_OFFSETS,設定SCAN_ENDUP_SPECIFIC_OFFSETS,這個屬性對應的值是從前端建表的時候傳入的。

private static EndupMode endfromOption(KafkaConnectorOptions.ScanEndupMode scanEndupMode) {
    switch (scanEndupMode) {
        case LATEST_OFFSET:
            return EndupMode.LATEST;
        case GROUP_OFFSETS:
            return EndupMode.GROUP_OFFSETS;
        case SPECIFIC_OFFSETS:
            return EndupMode.SPECIFIC_OFFSETS;
        case TIMESTAMP:
            return EndupMode.TIMESTAMP;

        default:
            throw new TableException(
                    "Unsupported endup mode. Validator should have checked that.");
    }
}
  /** Kafka endup options. * */
public static class EndupOptions {
    public EndupMode endupMode;
    public Map<KafkaTopicPartition, Long> specificOffsets;
    public long endupTimestampMillis;
}

5,KafkaDynamicSource

對應前面的修改,後續建立資料來源方法也要修改,將新增的引數加入即可
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

6,KafkaDynamicTableFactory

同理跟隨前面新增的引數,後續建立物件也需加上
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java!

7,UpsertKafkaDynamicTableFactory

同理跟隨前面新增的引數,後續建立物件也需加上
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java

以後是全部需要修改的地方。

下面測試檔案會在打包時會報錯,修改一下就行了

具體的測試程式碼如下:

1,建立執行環境

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,settings);

2,建表

有3種設定結束偏移位置的方式,以下是具體案例

// 1,建表語句,latest-offset
//  'scan.endup.mode' = 'specific-offsets',\n" +
String connectSql = "CREATE TABLE KafkaTable (\n" +
        "  `user_id` BIGINT,\n" +
        "  `item_id` BIGINT,\n" +
        "  `age` BIGINT\n" +
        ") WITH (\n" +
        "  'connector' = 'kafka',\n" +
        "  'topic' = 'test02',\n" +
        "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
        "  'properties.group.id' = 'testGroup',\n" +
        "  'scan.startup.mode' = 'earliest-offset',\n" +
        "  'scan.endup.mode' = 'latest-offset',\n" +
        "  'format' = 'csv'\n" +
        ")";
// 2,建立連線sql 特定偏移位置
//  'scan.endup.mode' = 'specific-offsets',\n" +
//  'scan.endup.specific-offsets' = 'partition:0,offset:22',\n" +
String connectSql = "CREATE TABLE KafkaTable (\n" +
        "  `user_id` BIGINT,\n" +
        "  `item_id` BIGINT,\n" +
        "  `age` BIGINT\n" +
        ") WITH (\n" +
        "  'connector' = 'kafka',\n" +
        "  'topic' = 'test02',\n" +
        "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
        "  'properties.group.id' = 'testGroup',\n" +
        "  'scan.startup.mode' = 'earliest-offset',\n" +
        "  'scan.endup.mode' = 'specific-offsets',\n" +
        "  'scan.endup.specific-offsets' = 'partition:0,offset:22',\n" +
        "  'format' = 'csv'\n" +
        ")";
// 3,建立連線sql 特定時間點
// "  'scan.endup.mode' = 'timestamp',\n" +
// "  'scan.endup.timestamp-millis' = '1648124880000',\n" +
String connectSql = "CREATE TABLE KafkaTable (\n" +
        "  `user_id` BIGINT,\n" +
        "  `item_id` BIGINT,\n" +
        "  `age` BIGINT\n" +
        ") WITH (\n" +
        "  'connector' = 'kafka',\n" +
        "  'topic' = 'test02',\n" +
        "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
        "  'properties.group.id' = 'testGroup',\n" +
        "  'scan.startup.mode' = 'earliest-offset',\n" +
        "  'scan.endup.mode' = 'timestamp',\n" +
        "  'scan.endup.timestamp-millis' = '1648124880000',\n" +
        "  'format' = 'csv'\n" +
        ")";
//執行sql建立表
streamTableEnvironment.executeSql(connectSql);

3,輸出邏輯


//查詢邏輯
Table result = streamTableEnvironment.sqlQuery("select user_id ,item_id,age from KafkaTable");
//表資料轉流資料 方便輸出
DataStream<Row> rowDataStream = streamTableEnvironment.toDataStream(result);

streamExecutionEnvironment.execute();

https://www.cnblogs.com/eryuan/p/15791843.html