1. 程式人生 > 其它 >flink讀寫kafka--寫kafka

flink讀寫kafka--寫kafka

flink,kafka

flink讀寫kafka--寫kafka

介紹

主要介紹實際中flink如何讀取寫入設定kafka

flink版本:1.13.2

github地址:https://github.com/dahai1996/mdw-flink-quickstart


寫入kafka

引入依賴

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

包裝下常用設定

public class SinkKafka {
    String topic;
    int kafkaProducersPoolSize;
    Properties propertiesSink = new Properties();

    /**
     * 獲取kafka sink,該sink為string型別資料提供處理,需提交做好資料轉換
     *
     * @param runEnv                 執行環境
     * @param requestTimeOutMs       超時時間,毫秒
     * @param topic                  寫入的topic名
     * @param kafkaProducersPoolSize 覆蓋預設生產者池大小,預設為5
     */
    public SinkKafka(RunEnv runEnv, String requestTimeOutMs, String topic, int kafkaProducersPoolSize) {
        propertiesSink.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, runEnv.getKafkaHost());
        propertiesSink.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeOutMs);
        this.topic = topic;
        this.kafkaProducersPoolSize = kafkaProducersPoolSize;
    }

    /**
     * 獲取kafka sink,該sink為string型別資料提供處理,需提交做好資料轉換
     *
     * @param runEnv           執行環境
     * @param requestTimeOutMs 超時時間,毫秒
     * @param topic            寫入的topic名
     */
    public SinkKafka(RunEnv runEnv, String requestTimeOutMs, String topic) {
        propertiesSink.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, runEnv.getKafkaHost());
        propertiesSink.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeOutMs);
        this.topic = topic;
        this.kafkaProducersPoolSize = 5;
    }

    /**
     * 獲取kafka sink,使用預設序列化器
     * @param kafkaSerializationSchema 指定序列化器
     * @return 返回一個kafka sink
     */
    public FlinkKafkaProducer<String> getSink(KafkaSerializationSchema<String> kafkaSerializationSchema) {
        return new FlinkKafkaProducer<String>(
                topic,
                kafkaSerializationSchema,
                propertiesSink,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
                kafkaProducersPoolSize);
    }

    /**
     * 獲取kafka sink,使用預設序列化器
     * @return 返回一個kafka sink
     */
    public FlinkKafkaProducer<String> getSink() {
        return new FlinkKafkaProducer<String>(
                topic,
                new DefaultKafkaSerializationSchema(topic),
                propertiesSink,
                FlinkKafkaProducer.Semantic.AT_LEAST_ONCE,
                kafkaProducersPoolSize);
    }

    /**
     * 獲取端到端一致性的kafka sink,使用預設序列化器
     * @param transactionTimeoutMs kafka事務提交時間,應該大於checkpoint時間間隔,小於kafka設定中transaction.max.timeout.ms(預設為15分鐘)
     * @return 返回一個提供事務的kafka sink
     */
    public FlinkKafkaProducer<String> getExactlyOnceSink(String transactionTimeoutMs) {
        propertiesSink.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,transactionTimeoutMs);
        return new FlinkKafkaProducer<String>(
                topic,
                new DefaultKafkaSerializationSchema(topic),
                propertiesSink,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
                kafkaProducersPoolSize);
    }

    /**
     * 獲取端到端一致性的kafka sink
     * @param transactionTimeoutMs kafka事務提交時間,應該大於checkpoint時間間隔,小於kafka設定中transaction.max.timeout.ms(預設為15分鐘)
     * @param kafkaSerializationSchema 指定序列化器
     * @return 返回一個提供事務的kafka sink
     */
    public FlinkKafkaProducer<String> getExactlyOnceSink(String transactionTimeoutMs,KafkaSerializationSchema<String> kafkaSerializationSchema) {
        propertiesSink.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,transactionTimeoutMs);
        return new FlinkKafkaProducer<String>(
                topic,
                new DefaultKafkaSerializationSchema(topic),
                propertiesSink,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
                kafkaProducersPoolSize);
    }

    public static class DefaultKafkaSerializationSchema implements KafkaSerializationSchema<String> {
        String topic;

        public DefaultKafkaSerializationSchema(String topic) {
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
            return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8));
        }
    }
}

使用

一般情況

FlinkKafkaProducer<String> sink1 = new SinkKafka(uat, "60000","topicName").getSink( );

自定義序列化

FlinkKafkaProducer<String> sink = new SinkKafka(uat, "60000", "topicName").getSink(
                new SinkKafka.DefaultKafkaSerializationSchema("topicName")
        );

注:DefaultKafkaSerializationSchema 使用utf-8序列化資料,返回string型別

端到端一致性設定

        FlinkKafkaProducer<String> sink2 = new SinkKafka(uat, "60000", "topicName").getExactlyOnceSink("60000");

注:這裡表示,kafka寫入資料的時候會有提交的動作,本質上就是一個標記。
在下一步處理的程式中中,讀取kafka的設定中使用 setExactlyOnce()方法(見前文),表示讀取有已提交標記的資料,就實現了端到端一致性。本質上是模擬兩段式提交。

讀取kafka時間戳作為水印


    /**
     * @param env         流執行環境
     * @param sourceKafka kafka資料來源
     * @param duration    水印空閒時間
     * @param name        該步驟name
     * @return 一個帶水印的kafka資料來源,水印來自於kafka自帶的時間戳
     */
    public static SingleOutputStreamOperator<String> getKafkaSourceWithMonotonousWatermarks(StreamExecutionEnvironment env, FlinkKafkaConsumerBase<String> sourceKafka, Duration duration, String name) {
        return env.addSource(sourceKafka).assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(duration)).name(name);
    }

注:這裡包裝作為了常用方法。讀取kafka的時間戳作為水印

注2:本質上是使用:

public final class RecordTimestampAssigner<E> implements TimestampAssigner<E> {
    public RecordTimestampAssigner() {
    }

    public long extractTimestamp(E element, long recordTimestamp) {
        return recordTimestamp;
    }
}

以此從訊息中提取時間戳,這裡kafka內部做了優化,幫助完善了內部實現。