1. 程式人生 > >kafka之四 consumer 解析

kafka之四 consumer 解析

摘要

   主要介紹了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer原理,以及適用場景和Java API實現

High Level Consumer 原理

   High Level Consumer API圍繞著Consumer Group這個邏輯概念展開,它遮蔽了每個Topic的每個Partition的Offset管理(自動讀取zookeeper中該Consumer group的last offset) 

consume設定注意事項:

  • 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許併發的,所以consumer數不要大於partition數
  • 如果consumer比partition少,一個consumer會對應於多個partitions,這裡主要合理分配consumer數和partition數,否則會導致partition裡面的資料被取的不均勻。最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目
  • 如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同
  • 增減consumer,broker,partition會導致rebalance,所以rebalance後consumer對應的partition會發生變化
  • High-level介面中獲取不到資料的時候是會block的

檢測消費者失敗

   當消費者訂閱topics後,當poll(long) 方法被呼叫的時候消費者自動加到消費組中;Poll方法呼叫可以確保消費者沒有失敗。只要持續的呼叫poll方法,組中的消費者都是啟用狀態,在這過程中,消費者會定期的向server傳送心跳包;如何消費者失敗或者沒有傳送心跳包,或者發心跳包間隔查過配置session.timeout.ms的時間,消費者就被認為dead的,它消費的分割槽講重新分配給其他消費者;

消費者消費資料的時候,有兩個引數可以配置:

  1. max.poll.interval.ms: By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from
    poll(long)
    . The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call poll often enough.

          通過上面的引數設定可以增加消費的時間間隔,這樣消費者有足夠的時間去處理好已經獲取的資料;延長這個配置也有缺點會延遲組的平衡,並效率也會減少

  1. max.poll.records: Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.
         通過max.poll.records配置可以設定每次poll數量大小

Consumer API

   一:新增maven依賴     

                <dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.1.0</version>
		</dependency>

       2.1 :Kafka's consumer api that relying on automatic offset committing.(kafka自動管理offset)

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
    自動提交可以出現的問題是:獲取資料後,offset自動提交到broker,但後續對這些資料處理失敗,這樣就會出現丟失資料現象
      2.2 :Manual Offset Control (手動控制提交offset)
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     //自動提交設定為false
    props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             //手動非同步提交
            consumer.commitSync();
             buffer.clear();
         }
     }
      手動提交offset會出現的問題:當資料已經被處理,比如insertIntoDb(buffer)後,在consumer.commitSync()提交之間出現異常,也就是資料已經被處理,但offset提交失敗,這樣下次消費也會獲取這條資料,會出現資料重複的現象。

       2.3: 手動細精度控制提交offset)

             try {

         while(running) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                 //獲取records的所在的所有分割槽,並迭代每個分割槽
                for (TopicPartition partition : records.partitions()) {
                      //獲取每個分割槽上的records
                     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                      //迭代一個分割槽的record 
                     for (ConsumerRecord<String, String> record : partitionRecords) {
                             System.out.println(record.offset() + ": " + record.value());
                      }
                      //獲取每個分割槽上最後的offset
                     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                      //每個分割槽分別提交offset
                     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                 }
          }
     } finally {
       consumer.close();
     }
      2.4 :消費具體partition上面的資料
     String topic = "test";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));