1. 程式人生 > >Kafka設計解析(二十)Apache Flink Kafka consumer

Kafka設計解析(二十)Apache Flink Kafka consumer

zook 實例 發送 abs 版本 conn 事情 save prope

轉載自 huxihx,原文鏈接 Apache Flink Kafka consumer

Flink提供了Kafka connector用於消費/生產Apache Kafka topic的數據。Flink的Kafka consumer集成了checkpoint機制以提供精確一次的處理語義。在具體的實現過程中,Flink不依賴於Kafka內置的消費組位移管理,而是在內部自行記錄和維護consumer的位移。

用戶在使用時需要根據Kafka版本來選擇相應的connector,如下表所示:

Maven依賴 支持的最低Flink版本 Kafka客戶端類名 說明
flink-connector-kafka-0.8_2.10
1.0.0

FlinkKafkaConsumer08

FlinkKafkaProducer08

使用的是Kafka老版本low-level consumer,即SimpleConsumer. Flink在內部會提交位移到Zookeeper
flink-connector-kafka-0.9_2.10 1.0.0

FlinkKafkaConsumer09

FlinkKafkaProducer09

使用Kafka新版本consumer
flink-connector-kafka-0.10_2.10 1.2.0

FlinkKafkaConsumer010

FlinkKafkaProducer010

支持使用Kafka 0.10.0.0版本新引入的內置時間戳信息

然後,將上面對應的connector依賴加入到maven項目中,比如:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.3.2</version>
</dependency>

Kafka Consumer

Flink kafka connector使用的consumer取決於用戶使用的是老版本consumer還是新版本consumer,新舊兩個版本對應的connector類名是不同的,分別是:FlinkKafkaConsumer09(或FlinkKafkaConsumer010)以及FlinkKafkaConsumer08。它們都支持同時消費多個topic。

該Connector的構造函數包含以下幾個字段:

  1. 待消費的topic列表
  2. key/value解序列化器,用於將字節數組形式的Kafka消息解序列化回對象
  3. Kafka consumer的屬性對象,常用的consumer屬性包括:bootstrap.servers(新版本consumer專用)、zookeeper.connect(舊版本consumer專用)和group.id

下面給出一個實例:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

DeserializationSchema

Flink的Kafka consumer需要依靠用戶指定的解序列化器來將二進制的數據轉換成Java對象。DeserializationSchema接口就是做這件事情的,該接口中的deserialize方法作用於每條Kafka消息上,並把轉換的結果發往Flink的下遊operator。

通常情況下,用戶直接繼承AbstractDeserializationSchema來創建新的deserializer,也可以實現DeserializationSchema接口,只不過要自行實現getProducedType方法。

如果要同時解序列化Kafka消息的key和value,則需要實現KeyedDeserializationSchema接口,因為該接口的deserialize方法同時包含了key和value的字節數組。

Flink默認提供了幾種deserializer:

  • TypeInformationSerializationSchema(以及TypeInformationKeyValueSerializationSchema):創建一個基於Flink TypeInformation的schema,適用於數據是由Flink讀寫之時。比起其他序列化方法,這種schema性能更好
  • JsonDeserializationSchema(JSONKeyValueDeserializationSchema):將JSON轉換成ObjectNode對象,然後通過ObjectNode.get("fieldName").as(Int/String...)()訪問具體的字段。KeyValue

一旦在解序列化過程中出現錯誤,Flink提供了兩個應對方法——1. 在deserialize方法中拋出異常,使得整個作業失敗並重啟;2. 返回null告訴Flink Kafka connector跳過這條異常消息。值得註意的是,由於consumer是高度容錯的,如果采用第一種方式會讓consumer再次嘗試deserialize這條有問題的消息。因此倘若deserializer再次失敗,程序可能陷入一個死循環並不斷進行錯誤重試。

Kafka consumer起始位移配置

Flink的Kafka consumer允許用戶配置Kafka consumer的起始讀取位移,如下列代碼所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour
 
DataStream<String> stream = env.addSource(myConsumer);
...

所有版本的Flink Kafka consumer都可以使用上面的方法來設定起始位移。

  • setStartFromGroupOffsets:這是默認情況,即從消費者組提交到Kafka broker上的位移開始讀取分區數據(對於老版本而言,位移是提交到Zookeeper上)。如果未找到位移,使用auto.offset.reset屬性值來決定位移。該屬性默認是LATEST,即從最新的消息位移處開始消費
  • setStartFromEarliest() / setStartFromLatest():設置從最早/最新位移處開始消費。使用這兩個方法的話,Kafka中提交的位移就將會被忽略而不會被用作起始位移

Flink也支持用戶自行指定位移,方法如下:

ap<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
 
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

上面的例子中,consumer將從用戶指定的位移處開始讀取消息。這裏的位移記錄的是下一條待消費消息的位移,而不是最新的已消費消息的位移。值得註意的是,如果待消費分區的位移不在保存的位移映射中,Flink Kafka connector會使用默認的組位移策略(即setStartFromGroupOffsets())。

另外,當任務自動地從失敗中恢復或手動地從savepoint中恢復時,上述這些設置位移的方法是不生效的。在恢復時,每個Kafka分區的起始位移都是由保存在savepoint或checkpoint中的位移來決定的。

Kafka consumer容錯性

一旦啟用了Flink的檢查點機制(checkpointing),Flink Kafka消費者會定期地對其消費的topic做checkpoint以保存它消費的位移以及其他操作的狀態。一旦出現失敗,Flink將會恢復streaming程序到最新的checkpoint狀態,然後重新從Kafka消費數據,重新讀取的位置就是保存在checkpoint中的位移。

checkpoint的間隔決定了程序容錯性的程度,它直接確定了在程序崩潰時,程序回溯到的最久狀態。

如果要使用啟動容錯性的Kafka消費者,定期對拓撲進行checkpoint就是非常必要的,實現方法如下面代碼所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每5秒做一次checkpoint  

需要註意的是,只有槽位(slot)充足Flink才會重啟拓撲,因此一旦拓撲因無法連接TaskManager而崩潰,仍然需要有足夠的slot才能重啟拓撲。如果使用YARN的話,Flink能夠自動地重啟丟失的YARN容器。

如果沒有啟用checkpoint,那麽Kafka consumer會定期地向Zookeeper提交位移。

Kafka consumer位移提交

Flink Kafka consumer可以自行設置位移提交的行為。當然,它不依賴於這些已提交的位移來實現容錯性。這些提交位移只是供監控使用。

配置位移提交的方法各異,主要依賴於是否啟用了checkpointing機制:

  • 未啟用checkpointing:Flink Kafka consumer依賴於Kafka提供的自動提交位移功能。設置方法是在Properties對象中配置Kafka參數enable.auto.commit(新版本Kafka consumer)或auto.commit.enable(老版本Kafka consumer)
  • 啟用checkpointing:Flink Kafka consumer會提交位移到checkpoint狀態中。這就保證了Kafka中提交的位移與checkpoint狀態中的位移是一致的。用戶可以調用setCommitOffsetsCheckpoints(boolean)方法來禁用/開啟位移提交——默認是true,即開啟了位移提交。註意,這種情況下,Flink會忽略上一種情況中提及的Kafka參數

Kafka consumer時間戳提取/水位生成

通常,事件或記錄的時間戳信息是封裝在消息體中。至於水位,用戶可以選擇定期地發生水位,也可以基於某些特定的Kafka消息來生成水位——這分別就是AssignerWithPeriodicWatermaks以及AssignerWithPunctuatedWatermarks接口的使用場景。

用戶也能夠自定義時間戳提取器/水位生成器,具體方法參見這裏,然後按照下面的方式傳遞給consumer:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
 
FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
 
DataStream<String> stream = env
    .addSource(myConsumer)
    .print();

在內部,Flink會為每個Kafka分區都執行一個對應的assigner實例。一旦指定了這樣的assigner,對於每條Kafka中的消息,extractTimestamp(T element, long previousElementTimestamp)方法會被調用來給消息分配時間戳,而getCurrentWatermark()方法(定時生成水位)或checkAndGetNextWatermark(T lastElement, long extractedTimestamp)方法(基於特定條件)會被調用以確定是否發送新的水位值。

Kafka設計解析(二十)Apache Flink Kafka consumer