1. 程式人生 > >Kafka 消費組消費者分配策略

Kafka 消費組消費者分配策略

微信公眾號:蘇言論
理論聯絡實際,暢言技術與生活。

消費組和消費者是kafka中比較重要的概念,理解和掌握原理有利於優化kafka效能和處理消費積壓問題。Kafka topic 由多個分割槽組成,分割槽分佈在叢集節點上;

Topic:topic01  PartitionCount:10       ReplicationFactor:2     Configs:
        Topic: topic01 Partition: 0    Leader: 1       Replicas: 1,4   Isr: 1,4
        Topic: topic01 Partition: 1    Leader: 2       Replicas: 2,5   Isr: 2,5
        Topic: topic01 Partition: 2    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: topic01 Partition: 3    Leader: 4       Replicas: 4,2   Isr: 4,2
        Topic: topic01 Partition: 4    Leader: 5       Replicas: 5,3   Isr: 5,3
        Topic: topic01 Partition: 5    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: topic01 Partition: 6    Leader: 2       Replicas: 2,4   Isr: 2,4
        Topic: topic01 Partition: 7    Leader: 3       Replicas: 3,5   Isr: 3,5
        Topic: topic01 Partition: 8    Leader: 4       Replicas: 4,1   Isr: 4,1
        Topic: topic01 Partition: 9    Leader: 5       Replicas: 5,2   Isr: 5,2

當外部程式消費topic資料時,kafka將其視為消費組(ConsumerGroup),每個消費組包含1個或多個消費者(Consumer),消費者數量最多可以為分割槽總數量,並不是可以無限量。當消費組中的任意一個消費者終止時,kafka會對消費組進行平衡(Rebalance),再根據存活消費數和消費者分配策略重新分配消費者。在0.10.x版本中,kafka提供兩種分配策略(RangeAssignor、RoundRobinAssignor),0.11.x 版本新增策略(StickyAssignor),結構如下;

1 RangeAssignor 策略

RangeAssignor 以主題為單位,以資料順序排列可用分割槽,以字典順序排列消費者,將topic分割槽數除以消費者總數,以確定分配給每個消費者的分割槽數;如果沒有平均分配,那麼前幾個消費者將擁有一個額外的分割槽。實現程式碼;

for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList<TopicPartition>());

        for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<String> consumersForTopic = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null)
                continue;

            Collections.sort(consumersForTopic);

            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();  //topic分割槽數除以消費者總數
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();  //計算額外分割槽

            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
            }
        }

比如有兩個topic(topic1 ,topic2) ,每個topic都有三個分割槽;

  • topic1 ,分割槽:topic1p0,topic1p1,topic1p2
  • topic2 ,分割槽:topic2p0,topic2p1,topic2p2

和一個消費組(consumer_group1),有(consumer1,consumer2)兩個消費者,使用RangeAssignor策略可能會得到如下的分配:

  • consumer1: [topic1p0,topic1p1,topic2p0,topic2p1]
  • consumer2: [topic1p2,topic2p2]

如果此時消費組(consumer_group1)有新的消費者consumer3加入,使用RangeAssignor策略可能會得到如下的分配:

  • consumer1: [topic1p0,topic2p0]
  • consumer2: [topic1p2,topic2p2]
  • consumer3: [topic1p1,topic2p1]

2 RoundRobinAssignor 策略

RoundRobinAssignor 是kafka預設策略,對所有分割槽和所有消費者迴圈分配,分割槽更均衡;實現程式碼;

Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList<TopicPartition>());

        CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));
        for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
            final String topic = partition.topic();
            while (!subscriptions.get(assigner.peek()).topics().contains(topic))
                assigner.next();
            assignment.get(assigner.next()).add(partition);
        }

繼續以上例topic和消費組為例,RoundRobinAssignor 策略可能會得到如下的分配;

  • consumer1: [topic1p0,topic1p1,topic2p2,]
  • consumer2: [topic2p0,topic2p1,topic1p2]

3 StickyAssignor 策略

StickyAssignor 策略是最複雜且是0.11.x 版本出現的新策略,該策略主要作用:

  • 使topic分割槽分配儘可能均勻的分配給消費者
  • 當某個消費者終止觸發重新分配時,儘可能保留現有分配,將已經終止的消費者所分配的分割槽移動到另一個消費者,避免全部分割槽重新平衡,節省開銷。

這個策略自0.11.x 版本出現後,一直到新版本有不同bug被發現,低版本慎用。

4 java多執行緒消費例項

public class KafkaTopicConsumer {   
    private KafkaConsumer<String, String> consumer;
    private int consumerId=0; //消費例項id
    private final long timeOut=10000;
    public KafkaTopicConsumer(int consumerId){
        this.consumerId=consumerId;
        Properties props = new Properties();
        props.put("client.id", "client-" + consumerId);
        props.put("bootstrap.servers","192.168.1.10:9092,192.168.1.11:9092");
        props.put("group.id", "test-group03");
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //設定分割槽策略
        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("topic1","topic2"));
    }

    public void consume() {
        while (true){
            ConsumerRecords<String, String> records=consumer.poll(timeOut);
            System.out.println("records count:"+records.count());
            for (ConsumerRecord<String, String> record : records) {
              System.out.println(String.format("client-id = %d , topic = %s, partition = %d , offset = %d, key = %s, value = %s", this.consumerId,record.topic(), record.partition(), record.offset(), record.key(), record.value()));
            }
            consumer.commitSync();
        }
    }

    public static void main(String[] args) {
        int threadSize=Integer.parseInt(args[0]);
        for (int i = 0; i < threadSize; i++) {
            int id = i;
            new Thread() {
                @Override
                public void run() {
                    new KafkaTopicConsumer(id).consume();
                }
            }.start();
        }
    }//
}

啟動三個多執行緒例項消費,分割槽分配到每個消費者的情況;

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-group03                   topic2                0          0               3333            3333            client-0_/192.168.1.13
test-group03                   topic1                0          500             3333            2833            client-0_/192.168.1.13
test-group03                   topic2                2          0               3333            3333            client-2_/192.168.1.13
test-group03                   topic1                2          500             3333            2833            client-2_/192.168.1.13
test-group03                   topic2                1          500             3334            2834            client-1_/192.168.1.13
test-group03                   topic1                1          0               3334            3334            client-1_/192.168.1.13

對於大的topic,將topic單獨消費以避免資料積壓和topic各自影響資料處理速度,比如文章開始時提到的10分割槽的topic(topic01),根據硬體資源和分割槽策略設定合理的消費者,資料量大時最優的消費者數量為分割槽總數。

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
test-group02                   topic01                       6          373460          1026328         652868          client-6_/192.168.1.13
test-group02                   topic01                       2          375660          1048756         673096          client-2_/192.168.1.13
test-group02                   topic01                       5          374625          1013157         638532          client-5_/192.168.1.13
test-group02                   topic01                       3          347001          1066967         719966          client-3_/192.168.1.13
test-group02                   topic01                       0          375570          1013261         637691          client-0_/192.168.1.13
test-group02                   topic01                       9          376545          1094088         717543          client-9_/192.168.1.13
test-group02                   topic01                       8          347082          1066948         719866          client-8_/192.168.1.13
test-group02                   topic01                       7          375100          1048827         673727          client-7_/192.168.1.13
test-group02                   topic01                       1          372447          1026467         654020          client-1_/192.168.1.13
test-group02                   topic01                       4          377052          1093926         716874          client-4_/192.168.1.13

5 總結

Kafka提供三種分配策略(RangeAssignor、RoundRobinAssignor、StickyAssignor),其中StickyAssignor策略是0.11.x 版本新增的,每種策略不盡相同,RangeAssignor策略以主題為單位,以資料順序排列可用分割槽,以字典順序排列消費者計算分配;RoundRobinAssignor 對所有分割槽和所有消費者迴圈均勻分配;但這兩種分配策略當有消費者終止或加入時均會觸發消費組平衡;StickyAssignor 策略當某個消費者終止時,儘可能保留現有分配,將已經終止的消費者所分配的分割槽移動到另一個消費者,避免全部分割槽重新平衡,節省開銷;對於topic分割槽數較多、數量較大使用StickyAssignor策略有較大優勢。

參考文獻

  • https://kafka.apache.org/0100/javadoc - kafka 0.10.0.1 API
  • https://kafka.apache.org/0100/documentation.html - kafka DOCUMENTATION