1. 程式人生 > >【Kafka源碼】KafkaConsumer

【Kafka源碼】KafkaConsumer

als aslist 獲取 主題 簡單 失敗 所有 流量 方式

[TOC]


KafkaConsumer是從kafka集群消費消息的客戶端。這是kafka的高級消費者,而SimpleConsumer是kafka的低級消費者。何為高級?何為低級?

我們所謂的高級,就是可以自動處理kafka集群的失敗信息,也可以適應kafka集群中消息的分區遷移。這個客戶端也可以與服務端進行交互,使用消費者分組負載平衡消費,下面我們具體會講解。

消費者與對應的broker保持TCP連接,來獲取數據。使用完成後關閉消費者連接失敗,會泄露連接。這個消費者不是線程安全的。

一、偏移量和消費者位置Offsets And Consumer position

Kafka在分區中為每條記錄維護了一個數字形式的偏移量。這個偏移量是數據在分區中的唯一值,也可以表示為消費者在分區中的偏移量。例如,一個消費者的偏移量為5,表示偏移量為0到4的消息已經被消費過。關於消費者使用的偏移量,有兩個比較重要的概念。

1.1 TopicPartition

消費者的偏移量表示消費者下一個需要消費的消息的偏移量。這個值會比當前消費者在那個分區剛剛消費的消息偏移量大一。這個值在下面情況下會自動增長:消費者調用了poll(long)並且獲取到了消息。

1.2 committed position提交偏移量

這個committed position表示最新的被安全保存的偏移量。如果當前過程中失敗然後重啟了,這個是重啟後消費的偏移量起點。消費者有三種方式來提交偏移量:

  • 自動定時提交
  • 同步提交。手動提交偏移量,使用的方法是commitSync(),這個方法會一直被阻塞,直到偏移量被成功提交了,或者在提交過程中發生了嚴重的錯誤。
  • 異步提交。使用的方法是commitAsync(OffsetCommitCallback),在成功或者發生嚴重錯誤後,會觸發OffsetCommitCallback方法。

二、消費分組和主題訂閱Consumer Groups and Topic Subscriptions

Kafka使用消費分組的概念,允許一個處理池來將消費和處理過程分開。這些處理可以在同一臺機器上運行,也可以分布在多臺機器上,來提供擴展性和容錯。

每個Kafka消費者都可以配置自己所屬的消費分組,並且可以通過接口subscribe(Collection, ConsumerRebalanceListener)動態設置訂閱的主題。Kafka會把每條消息傳遞到分組中的某個運行過程。這是通過平衡消費分組中的每個消費者對應的分區來實現的,最終實現的是每個分組正好被分配到分組的某個消費者。所以如果一個主題有4個分區,一個消費分組有兩個消費者process,每個process會消費兩個分區。

消費分組中的成員是動態的:如果某個process掛了,分配給他的分區會被分到組中其他的process。類似的,如果一個新的消費者加入了分組,分區會遷移到新的分組上。這被稱為分組平衡。需要註意的是,當訂閱的主題中新的分區出現的時候,相同的情況也會出現:分組不斷地檢測新的分區,平衡分組,最終每個分區都被分配到某個組成員上。

概念上,你可以把消費分組想象成一個單獨的邏輯上的消費者,恰巧有多個消費進程。作為一個多訂閱的系統,kafka天然支持某個主題有多個消費分組,而數據不會重復。

這些功能對於一個消息系統來說很普通。和傳統的消息隊列不同,你可以同時有很多的分組。在傳統消息系統中,每個消費者都會有自己的消費分組,所以每個消費者會訂閱主題下的所有記錄,也就是會收到所有的消息。

而且,當分組重新分配自動出現時,會通過ConsumerRebalanceListener通知消費者,然後消費者自身處理一些應用級的邏輯,比如狀態清除,手動提交offset等等。

對消費者來說,還可以手動分配分區,使用的方法是assign(類似於SimpleConsumer)。在這種情況下,動態分區調整和消費分組協調功能會被禁用。

三、檢測消費者失敗Detecting Consumer Failures

訂閱一批主題之後,消費者在調用poll的時候,會自動加入分組。poll是用於確保消費者的存活。只要消費者不停地調用poll,那麽他就會一直存在於分組中,並且不斷地收到對應分區推送給他的消息。另外,poll方法也會定時發送心跳給服務端,當你停止調用poll時,心跳也會停止。如果server超過session.timeout時間沒有收到心跳,消費者會被踢出分組,分區也會重新分配。這是為了防止消費者掛掉之後,還占用分區的情況發生(這種情況下分組中的其他消費者無法消費到消息)。為了繼續存在於分組中,消費者必須調用poll方法證明還活著。

這個設計的目的還在於,一個poll循環中的消息處理過程的時間必須是有界的,那樣心跳才能在session.timeout之前發出去。消費者提供兩個配置來控制這種行為:

  • session.timeout.ms:通過增加這個值,消費者可以有更多的時間來處理poll返回的一批消息。唯一的缺點就是服務端要耗費更多的時間來檢測消費者是否存活,這可能會導致分組平衡的延遲。然後,這不會影響close方法的調用,因為一旦調用了這個方法,消費者會發送一個明確的消息給服務端,離開分組,分組平衡會被立即觸發。
  • max.poll.records:一個poll循環的處理時間應該和消息的數量成正比。所以應該設置一次最多處理多少條數據。默認情況下,這個值沒有限制消息的數量。

三、舉例

當前消費客戶端提供了多種方法來消費,下面是幾個例子。

3.1 自動提交Automatic Offset Committing

這個方法說明了kafka消費客戶端的簡單使用,依賴於自動提交offset。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

配置enable.auto.commit表示是自動提交offset,並且提交的頻率為auto.commit.interval.ms。服務器可以通過bootstrap.servers來配置,可以不用配置全部的服務器,因為會自動發現集群中所有的服務器。當然不建議只配置一個,因為如果這個掛掉了,就找不到其他的機器了。

反序列化配置表示如何把二進制消息轉換為消息對象。例如,使用string反序列化,表示消息的key和value都是字符串。

3.2 手動提交偏移量Manual Offset Control

與自動提交不同,我們可以通過配置來控制消息什麽時候消費完成,並提交偏移量。在一條消息需要多個任務處理,所有任務完成後才能提交偏移量的場景下,需要手動提交。在下面的例子中,我們會一次消費一批數據,然後把他們放到內存中,當消息達到一定的數量時,我們會把他們插入數據庫中。如果這種情況下,我們配置為自動提交,那麽就會出現消息被消費,但是實際上並沒有插入到數據庫的情況。為了避免這種情況,我們必須在消息插入數據庫之後,手動提交偏移量。這也會出現另一種情況,就是消息插入數據庫成功,但是在提交偏移量的過程中失敗。這種情況下,其他的消費者會繼續讀取偏移量,然後重新執行批量插入數據庫的操作。這麽使用的話,kafka提供的是“至少一次”的消息保證,也就是消息至少會被傳遞一次,但是消費失敗的情況下會重復消費。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}

上面的例子使用的是同步提交commitSync()方法。在某些情況下,你可能需要對消息消費有更加精確的控制,下面的例子中,我們按照分區提交偏移量。

try {
    while(running) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                System.out.println(record.offset() + &quot;: &quot; + record.value());
            }
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} finally {
  consumer.close();
}

註意:提交的偏移量應該是下一次消費的消息的偏移量,所以commitSync(offsets)這個方法中的內容,應該是當前消息偏移量加上一。

3.3 手動分配分區Manual Partition Assignment

前面的例子中,消費者訂閱主題,然後服務端動態分配了分區給消費者。在某些情況下,我們需要精確控制消費的分區,例如:

消費者維護了與分區相關的本地狀態(例如本地磁盤鍵值存儲),那麽他應該只讀取特定分區的數據。如果消費者本身是高可用的,在掛掉之後會自動重啟(可能正在使用集群管理框架,比如YARN,Mesos或者AWS,或者作為流式處理框架)。這種情況下,kafka沒必要檢測消費者的存活,重新分配分區,因為消費進程會在同樣的機器上重啟。

為了使用這種模式,我們不能使用subscribe(Collection),而是應該使用assign(Collection)方法,來指定一批分區。

String topic = "foo";                                    
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));  

這種情況下,不會使用到分組協調器,所以消費者掛掉的情況,也不會重新分配分區。所以每個消費者都是獨立的,為了避免偏移量提交沖突,每個消費者的分組信息應該是唯一的。

3.4 在kafka外存儲偏移量Storing Offsets Outside Kafka

消費者應用可能不想把offset存到kafka中,所以kafka也提供了把offset存儲到其他地方的接口。這種情況下,就能自己實現消息只消費一次的場景,比至少一次強很多。那麽我們如何使用呢?

  • 首先需要配置enable.auto.commit=false
  • 使用ConsumerRecord提供的偏移量來存儲你的offset
  • 重啟時使用seek(TopicPartition, long)來發現重啟前的offset

這種場景在手動分配分區的情況下很簡單。如果分區分配是自動的,我們需要特殊處理分區分配改變的情況。這個可以通過提供ConsumerRebalanceListener實例,在調用subscribe(Collection, ConsumerRebalanceListener)和subscribe(Pattern, ConsumerRebalanceListener)時實現。

3.5 控制消費者位點Controlling The Consumer‘s Position

大多數情況下,消費者會簡單的從頭到尾消費消息,定時提交位點(自動或手動)。然後,kafka允許消費者手動控制位點,可以設置位點的位置。這意味著消費者可以消費已經消費過的消息,也可以跳過最新的消息。

kafka可以通過seek(TopicPartition, long)方法來指定消費起點,尋找早的或者新的位點,也可以通過seekToBeginning(Collection)和seekToEnd(Collection)來指定。

3.6 消費流量控制

如果一個消費者被分配到了多個分區,他會嘗試同時消費所有的分區,所有的分區的權重一樣。然而,在某些情況下,消費者需要首先全速消費某些特定的分區,當這個分區沒有消息後再消費其他的分區。

例如,流式處理。消費者同時從兩個主題消費,然後把消息合並。當某個主題落後於另一個主題很多時,消費者應該停止消費快的那個主題,等慢的那個趕上來。再比如,有個主題有很多歷史數據需要被消費,這種情況下,消費者應該優先消費那些有最新消息的主題。

kafka支持動態控制消費流量,通過pause(Collection)和resume(Collection)方法。

四、多線程消費Multi-threaded Processing

kafka消費者不是線程安全的。所有的網絡IO操作都在發起調用的一個線程中執行。他需要保證多線程時的線程安全。不同的操作會引起ConcurrentModificationException。

我們在外部線程中可以調用wakeup()方法來停止當前的操作。這種情況下,可能會從阻塞操作的線程拋出org.apache.kafka.common.errors.WakeupException異常。這可以用於在另一個線程中停止當前的消費者。

public class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

在另一個線程中,可以通過closed標識來關閉或者啟動消費者。

【Kafka源碼】KafkaConsumer