1. 程式人生 > >kafka consumer 配置詳解

kafka consumer 配置詳解

1、Consumer Group 與 topic 訂閱

每個Consumer 程序都會劃歸到一個邏輯的Consumer Group中,邏輯的訂閱者是Consumer Group。所以一條message可以被多個訂閱message 所在的topic的每一個Consumer Group,也就好像是這條message被廣播到每個Consumer Group一樣。而每個Consumer Group中,類似於一個Queue(JMS中的Queue)的概念差不多,即一條訊息只會被Consumer Group中的一個Consumer消費。

1.1 Consumer 與 partition

    其實上面所說的訂閱關係還不夠明確,其實topic中的partition被分配到某個consumer上,也就是某個consumer訂閱了某個partition。 再重複一下:consumer訂閱的是partition,而不是message

。所以在同一時間點上,訂閱到同一個partition的consumer必然屬於不同的Consumer Group。

    在官方網站上,給出了這樣一張圖:

 

一個kafka cluster中的某個topic,有4個partition。有兩個consumer group (A and B)訂閱了該topic。 Consumer Group A有2個partition:p0、p1,Consumer Group B有4個partition:c3,c4,c5,c6。經過分割槽分配後,consumer與partition的訂閱關係如下:

複製程式碼
Topic 中的4partitionConsumer Group A中的分配情況如下:
C1 訂閱p0,p3
C2 訂閱p1,p2
Topic 中的4partitionConsumer Group B中的分配情況如下:
C3 訂閱p0
C4 訂閱p3
C5 訂閱p1
C6 訂閱p2
複製程式碼

 另外要知道的是,partition分配的工作其實是在consumer leader中完成的。

1.2 Consumer 與Consumer Group

Consumer Group與Consumer的關係是動態維護的:

當一個Consumer 程序掛掉 或者是卡住時,該consumer所訂閱的partition會被重新分配到該group內的其它的consumer上。當一個consumer加入到一個consumer group中時,同樣會從其它的consumer中分配出一個或者多個partition 到這個新加入的consumer。

    當啟動一個Consumer時,會指定它要加入的group,使用的是配置項:group.id。

為了維持Consumer 與 Consumer Group的關係,需要Consumer週期性的傳送heartbeat到coordinator(協調者,在早期版本,以zookeeper作為協調者。後期版本則以某個broker作為協調者)。當Consumer由於某種原因不能發Heartbeat到coordinator時,並且時間超過session.timeout.ms時,就會認為該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。而這個過程,被稱為rebalance。

那麼現在有這樣一個問題:如果一個consumer 程序一直在週期性的傳送heartbeat,但是它就是不消費訊息,這種狀態稱為livelock狀態。那麼在這種狀態下,它所訂閱的partition不訊息是否就一直不能被消費呢?

1.3 Coordinator

    Coordinator 協調者,協調consumer、broker。早期版本中Coordinator,使用zookeeper實現,但是這樣做,rebalance的負擔太重。為了解決scalable的問題,不再使用zookeeper,而是讓每個broker來負責一些group的管理,這樣consumer就完全不再依賴zookeeper了。

1.3.1 Consumer連線到coordinator

    從Consumer的實現來看,在執行poll或者是join group之前,都要保證已連線到Coordinator。連線到coordinator的過程是:

    1)連線到最後一次連線的broker(如果是剛啟動的consumer,則要根據配置中的borker)。它會響應一個包含coordinator資訊(host, port等)的response。

    2)連線到coordinator。

1.4 Consumer Group Management

    Consumer Group 管理中,也是需要coordinator的參與。一個Consumer要join到一個group中,或者一個consumer退出時,都要進行rebalance。進行rebalance的流程是:

1)會給一個coordinator發起Join請求(請求中要包括自己的一些元資料,例如自己感興趣的topics)

2)Coordinator 根據這些consumer的join請求,選擇出一個leader,並通知給各個consumer。這裡的leader是consumer group 內的leader,是由某個consumer擔任,不要與partition的leader混淆。

3)Consumer leader 根據這些consumer的metadata,重新為每個consumer member重新分配partition。分配完畢通過coordinator把最新分配情況同步給每個consumer。

4)Consumer拿到最新的分配後,繼續工作。

2、Consumer Fetch Message

在Kafka partition中,每個訊息有一個唯一標識,即partition內的offset。每個consumer group中的訂閱到某個partition的consumer在從partition中讀取資料時,是依次讀取的。

   

    上圖中,Consumer A、B分屬於不用的Consumer Group。Consumer B讀取到offset =11,Consumer A讀取到offset=9 。這個值表示Consumer Group中的某個Consumer 在下次讀取該partition時會從哪個offset的 message開始讀取,即 Consumer Group A 中的Consumer下次會從offset = 9 的message 讀取, Consumer Group B 中的Consumer下次會從offset = 11 的message 讀取。

    這裡並沒有說是Consumer A 下次會從offset = 9 的message讀取,原因是Consumer A可能會退出Group ,然後Group A 進行rebalance,即重新分配分割槽。

2.1 poll 方法

Consumer讀取partition中的資料是通過呼叫發起一個fetch請求來執行的。而從KafkaConsumer來看,它有一個poll方法。但是這個poll方法只是可能會發起fetch請求。原因是:Consumer每次發起fetch請求時,讀取到的資料是有限制的,通過配置項max.partition.fetch.bytes來限制的。而在執行poll方法時,會根據配置項個max.poll.records來限制一次最多pool多少個record。

那麼就可能出現這樣的情況: 在滿足max.partition.fetch.bytes限制的情況下,假如fetch到了100個record,放到本地快取後,由於max.poll.records限制每次只能poll出15個record。那麼KafkaConsumer就需要執行7次才能將這一次通過網路發起的fetch請求所fetch到的這100個record消費完畢。其中前6次是每次pool中15個record,最後一次是poll出10個record。

    在consumer中,還有另外一個配置項:max.poll.interval.ms ,它表示最大的poll資料間隔,如果超過這個間隔沒有發起pool請求,但heartbeat仍舊在發,就認為該consumer處於 livelock狀態。就會將該consumer退出consumer group。所以為了不使Consumer 自己被退出,Consumer 應該不停的發起poll(timeout)操作。而這個動作 KafkaConsumer Client是不會幫我們做的,這就需要自己在程式中不停的呼叫poll方法了。

2.2 commit offset

    當一個consumer因某種原因退出Group時,進行重新分配partition後,同一group中的另一個consumer在讀取該partition時,怎麼能夠知道上一個consumer該從哪個offset的message讀取呢?也是是如何保證同一個group內的consumer不重複消費訊息呢?上面說了一次走網路的fetch請求會拉取到一定量的資料,但是這些資料還沒有被訊息完畢,Consumer就掛掉了,下一次進行資料fetch時,是否會從上次讀到的資料開始讀取,而導致Consumer消費的資料丟失嗎?

    為了做到這一點,當使用完poll從本地快取拉取到資料之後,需要client呼叫commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪一個offset的message。

    而這個commit方法會通過走網路的commit請求將offset在coordinator中保留,這樣就能夠保證下一次讀取(不論進行了rebalance)時,既不會重複消費訊息,也不會遺漏訊息。

    對於offset的commit,Kafka Consumer Java Client支援兩種模式:由KafkaConsumer自動提交,或者是使用者通過呼叫commitSync、commitAsync方法的方式完成offset的提交。

自動提交的例子:

複製程式碼
   Properties props = new Properties();

     props.put("bootstrap.servers", "localhost:9092");

     props.put("group.id", "test");

     props.put("enable.auto.commit", "true");

     props.put("auto.commit.interval.ms", "1000");

     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

     consumer.subscribe(Arrays.asList("foo", "bar"));

     while (true) {

         ConsumerRecords<String, String> records = consumer.poll(100);

         for (ConsumerRecord<String, String> record : records)

             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

     }
複製程式碼

手動提交的例子: 

複製程式碼
Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }
複製程式碼

在手動提交時,需要注意的一點是:要提交的是下一次要讀取的offset,例如: 

複製程式碼
try {
         while(running) {
            // 取得訊息
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            // 根據分割槽來遍歷資料:
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 // 資料處理
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 // 取得當前讀取到的最後一條記錄的offset
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 提交offset,記得要 + 1
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }
複製程式碼

3、Consumer的執行緒安全性

KafkaProducer是執行緒安全的,上一節已經瞭解到。但Consumer卻沒有設計成執行緒安全的。當用戶想要在在多執行緒環境下使用kafkaConsumer時,需要自己來保證synchronized。如果沒有這樣的保證,就會丟擲ConcurrentModificatinException的。

當你想要關閉Consumer或者為也其它的目的想要中斷Consumer的處理時,可以呼叫consumer的wakeup方法。這個方法會丟擲WakeupException。

例如:

複製程式碼
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
 
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // Handle new records
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
 
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
 }