flink讀寫kafka--寫kafka
阿新 • • 發佈:2022-01-11
flink,kafka
flink讀寫kafka--寫kafka
介紹
主要介紹實際中flink如何讀取寫入設定kafka
flink版本:1.13.2
github地址:https://github.com/dahai1996/mdw-flink-quickstart
寫入kafka
引入依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency>
包裝下常用設定
public class SinkKafka { String topic; int kafkaProducersPoolSize; Properties propertiesSink = new Properties(); /** * 獲取kafka sink,該sink為string型別資料提供處理,需提交做好資料轉換 * * @param runEnv 執行環境 * @param requestTimeOutMs 超時時間,毫秒 * @param topic 寫入的topic名 * @param kafkaProducersPoolSize 覆蓋預設生產者池大小,預設為5 */ public SinkKafka(RunEnv runEnv, String requestTimeOutMs, String topic, int kafkaProducersPoolSize) { propertiesSink.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, runEnv.getKafkaHost()); propertiesSink.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeOutMs); this.topic = topic; this.kafkaProducersPoolSize = kafkaProducersPoolSize; } /** * 獲取kafka sink,該sink為string型別資料提供處理,需提交做好資料轉換 * * @param runEnv 執行環境 * @param requestTimeOutMs 超時時間,毫秒 * @param topic 寫入的topic名 */ public SinkKafka(RunEnv runEnv, String requestTimeOutMs, String topic) { propertiesSink.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, runEnv.getKafkaHost()); propertiesSink.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeOutMs); this.topic = topic; this.kafkaProducersPoolSize = 5; } /** * 獲取kafka sink,使用預設序列化器 * @param kafkaSerializationSchema 指定序列化器 * @return 返回一個kafka sink */ public FlinkKafkaProducer<String> getSink(KafkaSerializationSchema<String> kafkaSerializationSchema) { return new FlinkKafkaProducer<String>( topic, kafkaSerializationSchema, propertiesSink, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, kafkaProducersPoolSize); } /** * 獲取kafka sink,使用預設序列化器 * @return 返回一個kafka sink */ public FlinkKafkaProducer<String> getSink() { return new FlinkKafkaProducer<String>( topic, new DefaultKafkaSerializationSchema(topic), propertiesSink, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, kafkaProducersPoolSize); } /** * 獲取端到端一致性的kafka sink,使用預設序列化器 * @param transactionTimeoutMs kafka事務提交時間,應該大於checkpoint時間間隔,小於kafka設定中transaction.max.timeout.ms(預設為15分鐘) * @return 返回一個提供事務的kafka sink */ public FlinkKafkaProducer<String> getExactlyOnceSink(String transactionTimeoutMs) { propertiesSink.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,transactionTimeoutMs); return new FlinkKafkaProducer<String>( topic, new DefaultKafkaSerializationSchema(topic), propertiesSink, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize); } /** * 獲取端到端一致性的kafka sink * @param transactionTimeoutMs kafka事務提交時間,應該大於checkpoint時間間隔,小於kafka設定中transaction.max.timeout.ms(預設為15分鐘) * @param kafkaSerializationSchema 指定序列化器 * @return 返回一個提供事務的kafka sink */ public FlinkKafkaProducer<String> getExactlyOnceSink(String transactionTimeoutMs,KafkaSerializationSchema<String> kafkaSerializationSchema) { propertiesSink.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,transactionTimeoutMs); return new FlinkKafkaProducer<String>( topic, new DefaultKafkaSerializationSchema(topic), propertiesSink, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize); } public static class DefaultKafkaSerializationSchema implements KafkaSerializationSchema<String> { String topic; public DefaultKafkaSerializationSchema(String topic) { this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) { return new ProducerRecord<byte[], byte[]>(topic, element.getBytes(StandardCharsets.UTF_8)); } } }
使用
一般情況
FlinkKafkaProducer<String> sink1 = new SinkKafka(uat, "60000","topicName").getSink( );
自定義序列化
FlinkKafkaProducer<String> sink = new SinkKafka(uat, "60000", "topicName").getSink(
new SinkKafka.DefaultKafkaSerializationSchema("topicName")
);
注:DefaultKafkaSerializationSchema 使用utf-8序列化資料,返回string型別
端到端一致性設定
FlinkKafkaProducer<String> sink2 = new SinkKafka(uat, "60000", "topicName").getExactlyOnceSink("60000");
注:這裡表示,kafka寫入資料的時候會有提交的動作,本質上就是一個標記。
在下一步處理的程式中中,讀取kafka的設定中使用 setExactlyOnce()方法(見前文),表示讀取有已提交標記的資料,就實現了端到端一致性。本質上是模擬兩段式提交。
讀取kafka時間戳作為水印
/**
* @param env 流執行環境
* @param sourceKafka kafka資料來源
* @param duration 水印空閒時間
* @param name 該步驟name
* @return 一個帶水印的kafka資料來源,水印來自於kafka自帶的時間戳
*/
public static SingleOutputStreamOperator<String> getKafkaSourceWithMonotonousWatermarks(StreamExecutionEnvironment env, FlinkKafkaConsumerBase<String> sourceKafka, Duration duration, String name) {
return env.addSource(sourceKafka).assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(duration)).name(name);
}
注:這裡包裝作為了常用方法。讀取kafka的時間戳作為水印
注2:本質上是使用:
public final class RecordTimestampAssigner<E> implements TimestampAssigner<E> {
public RecordTimestampAssigner() {
}
public long extractTimestamp(E element, long recordTimestamp) {
return recordTimestamp;
}
}
以此從訊息中提取時間戳,這裡kafka內部做了優化,幫助完善了內部實現。