Kafka設計解析(二十)Apache Flink Kafka consumer
轉載自 huxihx,原文鏈接 Apache Flink Kafka consumer
Flink提供了Kafka connector用於消費/生產Apache Kafka topic的數據。Flink的Kafka consumer集成了checkpoint機制以提供精確一次的處理語義。在具體的實現過程中,Flink不依賴於Kafka內置的消費組位移管理,而是在內部自行記錄和維護consumer的位移。
用戶在使用時需要根據Kafka版本來選擇相應的connector,如下表所示:
Maven依賴 | 支持的最低Flink版本 | Kafka客戶端類名 | 說明 |
flink-connector-kafka-0.8_2.10 |
1.0.0 |
FlinkKafkaConsumer08 FlinkKafkaProducer08 |
使用的是Kafka老版本low-level consumer,即SimpleConsumer. Flink在內部會提交位移到Zookeeper |
flink-connector-kafka-0.9_2.10 | 1.0.0 |
FlinkKafkaConsumer09 FlinkKafkaProducer09 |
使用Kafka新版本consumer |
flink-connector-kafka-0.10_2.10 | 1.2.0 |
FlinkKafkaConsumer010 FlinkKafkaProducer010 |
支持使用Kafka 0.10.0.0版本新引入的內置時間戳信息 |
然後,將上面對應的connector依賴加入到maven項目中,比如:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.3.2</version> </dependency>
Kafka Consumer
Flink kafka connector使用的consumer取決於用戶使用的是老版本consumer還是新版本consumer,新舊兩個版本對應的connector類名是不同的,分別是:FlinkKafkaConsumer09(或FlinkKafkaConsumer010)以及FlinkKafkaConsumer08。它們都支持同時消費多個topic。
該Connector的構造函數包含以下幾個字段:
- 待消費的topic列表
- key/value解序列化器,用於將字節數組形式的Kafka消息解序列化回對象
- Kafka consumer的屬性對象,常用的consumer屬性包括:bootstrap.servers(新版本consumer專用)、zookeeper.connect(舊版本consumer專用)和group.id
下面給出一個實例:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
DeserializationSchema
Flink的Kafka consumer需要依靠用戶指定的解序列化器來將二進制的數據轉換成Java對象。DeserializationSchema接口就是做這件事情的,該接口中的deserialize方法作用於每條Kafka消息上,並把轉換的結果發往Flink的下遊operator。
通常情況下,用戶直接繼承AbstractDeserializationSchema來創建新的deserializer,也可以實現DeserializationSchema接口,只不過要自行實現getProducedType方法。
如果要同時解序列化Kafka消息的key和value,則需要實現KeyedDeserializationSchema接口,因為該接口的deserialize方法同時包含了key和value的字節數組。
Flink默認提供了幾種deserializer:
- TypeInformationSerializationSchema(以及TypeInformationKeyValueSerializationSchema):創建一個基於Flink TypeInformation的schema,適用於數據是由Flink讀寫之時。比起其他序列化方法,這種schema性能更好
- JsonDeserializationSchema(JSONKeyValueDeserializationSchema):將JSON轉換成ObjectNode對象,然後通過ObjectNode.get("fieldName").as(Int/String...)()訪問具體的字段。KeyValue
一旦在解序列化過程中出現錯誤,Flink提供了兩個應對方法——1. 在deserialize方法中拋出異常,使得整個作業失敗並重啟;2. 返回null告訴Flink Kafka connector跳過這條異常消息。值得註意的是,由於consumer是高度容錯的,如果采用第一種方式會讓consumer再次嘗試deserialize這條有問題的消息。因此倘若deserializer再次失敗,程序可能陷入一個死循環並不斷進行錯誤重試。
Kafka consumer起始位移配置
Flink的Kafka consumer允許用戶配置Kafka consumer的起始讀取位移,如下列代碼所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); myConsumer.setStartFromEarliest(); // start from the earliest record possible myConsumer.setStartFromLatest(); // start from the latest record myConsumer.setStartFromGroupOffsets(); // the default behaviour DataStream<String> stream = env.addSource(myConsumer); ...
所有版本的Flink Kafka consumer都可以使用上面的方法來設定起始位移。
- setStartFromGroupOffsets:這是默認情況,即從消費者組提交到Kafka broker上的位移開始讀取分區數據(對於老版本而言,位移是提交到Zookeeper上)。如果未找到位移,使用auto.offset.reset屬性值來決定位移。該屬性默認是LATEST,即從最新的消息位移處開始消費
- setStartFromEarliest() / setStartFromLatest():設置從最早/最新位移處開始消費。使用這兩個方法的話,Kafka中提交的位移就將會被忽略而不會被用作起始位移
Flink也支持用戶自行指定位移,方法如下:
ap<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
上面的例子中,consumer將從用戶指定的位移處開始讀取消息。這裏的位移記錄的是下一條待消費消息的位移,而不是最新的已消費消息的位移。值得註意的是,如果待消費分區的位移不在保存的位移映射中,Flink Kafka connector會使用默認的組位移策略(即setStartFromGroupOffsets())。
另外,當任務自動地從失敗中恢復或手動地從savepoint中恢復時,上述這些設置位移的方法是不生效的。在恢復時,每個Kafka分區的起始位移都是由保存在savepoint或checkpoint中的位移來決定的。
Kafka consumer容錯性
一旦啟用了Flink的檢查點機制(checkpointing),Flink Kafka消費者會定期地對其消費的topic做checkpoint以保存它消費的位移以及其他操作的狀態。一旦出現失敗,Flink將會恢復streaming程序到最新的checkpoint狀態,然後重新從Kafka消費數據,重新讀取的位置就是保存在checkpoint中的位移。
checkpoint的間隔決定了程序容錯性的程度,它直接確定了在程序崩潰時,程序回溯到的最久狀態。
如果要使用啟動容錯性的Kafka消費者,定期對拓撲進行checkpoint就是非常必要的,實現方法如下面代碼所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 每5秒做一次checkpoint
需要註意的是,只有槽位(slot)充足Flink才會重啟拓撲,因此一旦拓撲因無法連接TaskManager而崩潰,仍然需要有足夠的slot才能重啟拓撲。如果使用YARN的話,Flink能夠自動地重啟丟失的YARN容器。
如果沒有啟用checkpoint,那麽Kafka consumer會定期地向Zookeeper提交位移。
Kafka consumer位移提交
Flink Kafka consumer可以自行設置位移提交的行為。當然,它不依賴於這些已提交的位移來實現容錯性。這些提交位移只是供監控使用。
配置位移提交的方法各異,主要依賴於是否啟用了checkpointing機制:
- 未啟用checkpointing:Flink Kafka consumer依賴於Kafka提供的自動提交位移功能。設置方法是在Properties對象中配置Kafka參數enable.auto.commit(新版本Kafka consumer)或auto.commit.enable(老版本Kafka consumer)
- 啟用checkpointing:Flink Kafka consumer會提交位移到checkpoint狀態中。這就保證了Kafka中提交的位移與checkpoint狀態中的位移是一致的。用戶可以調用setCommitOffsetsCheckpoints(boolean)方法來禁用/開啟位移提交——默認是true,即開啟了位移提交。註意,這種情況下,Flink會忽略上一種情況中提及的Kafka參數
Kafka consumer時間戳提取/水位生成
通常,事件或記錄的時間戳信息是封裝在消息體中。至於水位,用戶可以選擇定期地發生水位,也可以基於某些特定的Kafka消息來生成水位——這分別就是AssignerWithPeriodicWatermaks以及AssignerWithPunctuatedWatermarks接口的使用場景。
用戶也能夠自定義時間戳提取器/水位生成器,具體方法參見這裏,然後按照下面的方式傳遞給consumer:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> stream = env .addSource(myConsumer) .print();
在內部,Flink會為每個Kafka分區都執行一個對應的assigner實例。一旦指定了這樣的assigner,對於每條Kafka中的消息,extractTimestamp(T element, long previousElementTimestamp)方法會被調用來給消息分配時間戳,而getCurrentWatermark()方法(定時生成水位)或checkAndGetNextWatermark(T lastElement, long extractedTimestamp)方法(基於特定條件)會被調用以確定是否發送新的水位值。
Kafka設計解析(二十)Apache Flink Kafka consumer