初始 Kafka Consumer 消費者
溫馨提示:整個 Kafka 專欄基於 kafka-2.2.1 版本。
1、KafkaConsumer 概述
根據 KafkaConsumer 類上的註釋上來看 KafkaConsumer 具有如下特徵:
- 在 Kafka 中 KafkaConsumer 是執行緒不安全的。
- 2.2.1 版本的KafkaConsumer 相容 kafka 0.10.0 和 0.11.0 等低版本。
- 訊息偏移量與消費偏移量(訊息消費進度)
Kafka 為分割槽中的每一條訊息維護一個偏移量,即訊息偏移量。這個偏移量充當該分割槽內記錄的唯一識別符號。消費偏移量(訊息消費進度)儲存的是消費組當前的處理進度。訊息消費進度的提交在 kafka 中可以定時自動提交也可以手動提交。手動提交可以呼叫 ommitSync() 或 commitAsync 方法。 - 消費組 與 訂閱關係
多個消費這可以同屬於一個消費組,消費組內的所有消費者共同消費主題下的所有訊息。一個消費組可以訂閱多個主題。 佇列負載機制
既然同一個消費組內的消費者共同承擔主題下所有佇列的消費,那他們如何進行分工呢?預設情況下采取平均分配,例如一個消費組有兩個消費者c1、c2,一個 topic 的分割槽數為6,那 c1 會負責3個分割槽的消費,同樣 c2 會負責另外3個分割槽的分配。那如果其中一個消費者宕機或新增一個消費者,那佇列能動態調整嗎?
答案是會重新再次平衡,例如如果新增一個消費者 c3,則c1,c2,c3都會負責2個分割槽的訊息消費,分割槽重平衡會在後續文章中重點介紹。消費者也可以通過 assign 方法手動指定分割槽,此時會禁用預設的自動分配機制。消費者故障檢測機制
當通過 subscribe 方法訂閱某些主題時,此時該消費者還未真正加入到訂閱組,只有當 consumeer#poll 方法被呼叫後,並且會向 broker 定時傳送心跳包,如果 broker 在 session.timeout.ms 時間內未收到心跳包,則 broker 會任務該消費者已宕機,會將其剔除,並觸發消費端的分割槽重平衡。消費者也有可能遇到“活體鎖”的情況,即它繼續傳送心跳,但沒有任何進展。在這種情況下,為了防止消費者無限期地佔用它的分割槽,可以使用max.poll.interval.ms 設定提供了一個活性檢測機制。基本上,如果您呼叫輪詢的頻率低於配置的最大間隔,那麼客戶機將主動離開組,以便另一個消費者可以接管它的分割槽。當這種情況發生時,您可能會看到一個偏移提交失敗(由呼叫{@link #commitSync()}丟擲的{@link CommitFailedException}表示)。
- kafka 對 poll loop 行為的控制引數
Kafka 提供瞭如下兩個引數來控制 poll 的行為:- max.poll.interval.ms
允許 兩次呼叫 poll 方法的最大間隔,即設定每一批任務最大的處理時間。 - max.poll.records
每一次 poll 最大拉取的訊息條數。
對於訊息處理時間不可預測的情況下上述兩個引數可能不夠用,那將如何是好呢?
通常的建議將訊息拉取與訊息消費分開,一個執行緒負責 poll 訊息,處理這些訊息使用另外的執行緒,這裡就需要手動提交消費進度。為了控制訊息拉起的過快,您可能會需要用到 Consumer#pause(Collection) 方法,暫時停止向該分割槽拉起訊息。RocketMQ 的推模式就是採用了這種策略。如果大家有興趣的話,可以從筆者所著的《RocketMQ技術內幕》一書中詳細瞭解。
- max.poll.interval.ms
2、KafkaConsume 使用示例
2.1 自動提交消費進度
public static void testConsumer1() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072");
props.setProperty("group.id", "C_ODS_ORDERCONSUME_01");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("TOPIC_ORDER"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("訊息消費中");
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
2.2 手動提交消費進度
public static void testConsumer2() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
// insertIntoDb(buffer);
// 省略處理邏輯
consumer.commitSync();
buffer.clear();
}
}
}
3、認識 Consumer 介面
要認識 Kafka 的消費者,個人認為最好的辦法就是從它的類圖著手,下面給出 Consumer 介面的類圖。
接下來對起重點方法進行一個初步的介紹,從下篇文章開始將對其進行詳細設計。
- Set< TopicPartition> assignment()
獲取該消費者的佇列分配列表。 - Set< String> subscription()
獲取該消費者的訂閱資訊。 - void subscribe(Collection< String> topics)
訂閱主題。 - void subscribe(Collection< String> topics, ConsumerRebalanceListener callback)
訂閱主題,並指定佇列重平衡的監聽器。 - void assign(Collection< TopicPartition> partitions)
取代 subscription,手動指定消費哪些佇列。 - void unsubscribe()
取消訂閱關係。 - ConsumerRecords<K, V> poll(Duration timeout)
拉取訊息,是 KafkaConsumer 的核心方法,將在下文詳細介紹。 - void commitSync()
同步提交消費進度,為本批次的消費提交,將在後續文章中詳細介紹。 - void commitSync(Duration timeout)
同步提交消費進度,可設定超時時間。 - void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
顯示同步提交消費進度, offsets 指明需要提交消費進度的資訊。 - void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout)
顯示同步提交消費進度,帶超時間。 - void seek(TopicPartition partition, long offset)
重置 consumer#poll 方法下一次拉訊息的偏移量。 - void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
seek 方法過載方法。 - void seekToBeginning(Collection< TopicPartition> partitions)
將 poll 方法下一次的拉取偏移量設定為佇列的初始偏移量。 - void seekToEnd(Collection< TopicPartition> partitions)
將 poll 方法下一次的拉取偏移量設定為佇列的最大偏移量。 - long position(TopicPartition partition)
獲取將被拉取的偏移量。 - long position(TopicPartition partition, final Duration timeout)
同上。 - OffsetAndMetadata committed(TopicPartition partition)
獲取指定分割槽已提交的偏移量。 - OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)
同上。 - Map<MetricName, ? extends Metric> metrics()
統計指標。 - List< PartitionInfo> partitionsFor(String topic)
獲取主題的路由資訊。 - List< PartitionInfo> partitionsFor(String topic, Duration timeout)
同上。 - Map<String, List< PartitionInfo>> listTopics()
獲取所有 topic 的路由資訊。 - Map<String, List< PartitionInfo>> listTopics(Duration timeout)
同上。 - Set< TopicPartition> paused()
獲取已掛起的分割槽資訊。 - void pause(Collection< TopicPartition> partitions)
掛起分割槽,下一次 poll 方法將不會返回這些分割槽的訊息。 - void resume(Collection< TopicPartition> partitions)
恢復掛起的分割槽。 - Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
根據時間戳查詢最近的一條訊息的偏移量。 - Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)
同上。 - Map<TopicPartition, Long> beginningOffsets(Collection< TopicPartition> partitions)
查詢指定分割槽當前最小的偏移量。 - Map<TopicPartition, Long> beginningOffsets(Collection< TopicPartition> partitions, Duration timeout)
同上。 - Map<TopicPartition, Long> endOffsets(Collection< TopicPartition> partitions)
查詢指定分割槽當前最大的偏移量。 - Map<TopicPartition, Long> endOffsets(Collection< TopicPartition> partitions, Duration timeout)
同上。 - void close()
關閉消費者。 - void close(Duration timeout)
關閉消費者。 - void wakeup()
喚醒消費者。
4、初始 KafkaConsumer
接下來筆者根據其建構函式,對一一介紹其核心屬性的含義,為接下來講解其核心方法打下基礎。
- String groupId
消費組ID。同一個消費組內的多個消費者共同消費一個主題下的訊息。 - String clientId
發出請求時傳遞給伺服器的id字串。設定該值的目的是方便在伺服器端請求日誌中包含邏輯應用程式名稱,從而能夠跟蹤ip/埠之外的請求源。該值可以設定為應用名稱。 - ConsumerCoordinator coordinator
消費協調器,後續會詳細介紹。 - Deserializer< K> keyDeserializer
key 序列化器。 - Deserializer< V> valueDeserializer
值序列化器。 - ConsumerNetworkClient client
網路通訊客戶端。 - SubscriptionState subscriptions
用於管理訂閱狀態的類,用於跟蹤 topics, partitions, offsets 等資訊。後續會詳細介紹。 - ConsumerMetadata metadata
消費者元資料資訊,包含路由資訊。 - long retryBackoffMs
如果向 broker 傳送請求失敗後,發起重試之前需要等待的間隔時間,通過屬性 retry.backoff.ms 指定。 - long requestTimeoutMs
一次請求的超時時間。 - int defaultApiTimeoutMs
為所有可能阻塞的API設定一個預設的超時時間。 - List< PartitionAssignor> assignors
分割槽分配演算法(分割槽負載演算法)。
Kafka Consumer 消費者就介紹到這裡了,從下篇文章開始將開始詳細介紹 Kafka 關於訊息消費的方方面面。
作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社群佈道師,公眾號:中介軟體興趣圈 維護者,目前已陸續發表原始碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等原始碼專欄。歡迎加入我的知識星球,構建一個高質量的技術交流社群。