Kafka分區與消費者的關系
我們知道,生產者發送消息到主題,消費者訂閱主題(以消費者組的名義訂閱),而主題下是分區,消息是存儲在分區中的,所以事實上生產者發送消息到分區,消費者則從分區讀取消息,那麽,這裏問題來了,生產者將消息投遞到哪個分區?消費者組中的消費者實例之間是怎麽分配分區的呢?接下來,就圍繞著這兩個問題一探究竟。
2.? 主題的分區數設置
在server.properties配置文件中可以指定一個全局的分區數設置,這是對每個主題下的分區數的默認設置,默認是1。
當然每個主題也可以自己設置分區數量,如果創建主題的時候沒有指定分區數量,則會使用server.properties中的設置。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1
[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc
Topic:abc PartitionCount:2 ReplicationFactor:1 Configs:
Topic: abc Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: abc Partition: 1 Leader: 0 Replicas: 0 Isr: 0
首先提出一個問題:生產者將消息投遞到分區有沒有規律?如果有,那麽它是如何決定一條消息該投遞到哪個分區的呢?
3.1.? 默認的分區策略
The default partitioning strategy:
If a partition is specified in the record, use it
If no partition is specified but a key is present choose a partition based on a hash of the key
If no partition or key is present choose a partition in a round-robin fashion
默認的分區策略是:
如果在發消息的時候指定了分區,則消息投遞到指定的分區
如果沒有指定分區,但是消息的key不為空,則基於key的哈希值來選擇一個分區
如果既沒有指定分區,且消息的key也是空,則用輪詢的方式選擇一個分區
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
通過源代碼可以更加作證這一點
4.? 分區與消費者
消費者以組的名義訂閱主題,主題有多個分區,消費者組中有多個消費者實例,那麽消費者實例和分區之前的對應關系是怎樣的呢?
換句話說,就是組中的每一個消費者負責那些分區,這個分配關系是如何確定的?
同一時刻,一條消息只能被組中的一個消費者實例消費
消費者組訂閱這個主題,意味著主題下的所有分區都會被組中的消費者消費到,如果按照從屬關系來說的話就是,主題下的每個分區只從屬於組中的一個消費者,不可能出現組中的兩個消費者負責同一個分區。
那麽,問題來了。如果分區數大於或者等於組中的消費者實例數,那自然沒有什麽問題,無非一個消費者會負責多個分區,(PS:當然,最理想的情況是二者數量相等,這樣就相當於一個消費者負責一個分區);但是,如果消費者實例的數量大於分區數,那麽按照默認的策略(PS:之所以強調默認策略是因為你也可以自定義策略),有一些消費者是多余的,一直接不到消息而處於空閑狀態。
話又說回來,假設多個消費者負責同一個分區,那麽會有什麽問題呢?
我們知道,Kafka它在設計的時候就是要保證分區下消息的順序,也就是說消息在一個分區中的順序是怎樣的,那麽消費者在消費的時候看到的就是什麽樣的順序,那麽要做到這一點就首先要保證消息是由消費者主動拉取的(pull),其次還要保證一個分區只能由一個消費者負責。倘若,兩個消費者負責同一個分區,那麽就意味著兩個消費者同時讀取分區的消息,由於消費者自己可以控制讀取消息的offset,就有可能C1才讀到2,而C1讀到1,C1還沒處理完,C2已經讀到3了,則會造成很多浪費,因為這就相當於多線程讀取同一個消息,會造成消息處理的重復,且不能保證消息的順序,這就跟主動推送(push)無異。
4.1.? 消費者分區分配策略
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
如果是自定義分配策略的話可以繼承AbstractPartitionAssignor這個類,它默認有3個實現
4.1.1.??range
range策略對應的實現類是org.apache.kafka.clients.consumer.RangeAssignor
這是默認的分配策略
可以通過消費者配置中partition.assignment.strategy參數來指定分配策略,它的值是類的全路徑,是一個數組
/**
* The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
* and the consumers in lexicographic order. We then divide the number of partitions by the total number of
* consumers to determine the number of partitions to assign to each consumer. If it does not evenly
* divide, then the first few consumers will have one extra partition.
*
* For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
*
* The assignment will be:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
*/
range策略是基於每個主題的
對於每個主題,我們以數字順序排列可用分區,以字典順序排列消費者。然後,將分區數量除以消費者總數,以確定分配給每個消費者的分區數量。如果沒有平均劃分(PS:除不盡),那麽最初的幾個消費者將有一個額外的分區。
簡而言之,就是,
1、range分配策略針對的是主題(PS:也就是說,這裏所說的分區指的某個主題的分區,消費者值的是訂閱這個主題的消費者組中的消費者實例)
2、首先,將分區按數字順序排行序,消費者按消費者名稱的字典序排好序
3、然後,用分區總數除以消費者總數。如果能夠除盡,則皆大歡喜,平均分配;若除不盡,則位於排序前面的消費者將多負責一個分區
例如,假設有兩個消費者C0和C1,兩個主題t0和t1,並且每個主題有3個分區,分區的情況是這樣的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那麽,基於以上信息,最終消費者分配分區的情況是這樣的:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
為什麽是這樣的結果呢?
因為,對於主題t0,分配的結果是C0負責P0和P1,C1負責P2;對於主題t2,也是如此,綜合起來就是這個結果
上面的過程用圖形表示的話大概是這樣的:
閱讀代碼,更有助於理解:
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
// 主題與消費者的映射
Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList<TopicPartition>());
for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey(); // 主題
List<String> consumersForTopic = topicEntry.getValue(); // 消費者列表
// partitionsPerTopic表示主題和分區數的映射
// 獲取主題下有多少個分區
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
// 消費者按字典序排序
Collections.sort(consumersForTopic);
// 分區數量除以消費者數量
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 取模,余數就是額外的分區
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 分配分區
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
4.1.2. roundrobin(輪詢)
roundronbin分配策略的具體實現是org.apache.kafka.clients.consumer.RoundRobinAssignor
/**
* The round robin assignor lays out all the available partitions and all the available consumers. It
* then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
* instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
* will be within a delta of exactly one across all consumers.)
*
* For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
*
* The assignment will be:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
*
* When subscriptions differ across consumer instances, the assignment process still considers each
* consumer instance in round robin fashion but skips over an instance if it is not subscribed to
* the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
* assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,
* with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
* t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
*
* Tha assignment will be:
* C0: [t0p0]
* C1: [t1p0]
* C2: [t1p1, t2p0, t2p1, t2p2]
*/
輪詢分配策略是基於所有可用的消費者和所有可用的分區的
與前面的range策略最大的不同就是它不再局限於某個主題
如果所有的消費者實例的訂閱都是相同的,那麽這樣最好了,可用統一分配,均衡分配
例如,假設有兩個消費者C0和C1,兩個主題t0和t1,每個主題有3個分區,分別是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那麽,最終分配的結果是這樣的:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
用圖形表示大概是這樣的:
假設,組中每個消費者訂閱的主題不一樣,分配過程仍然以輪詢的方式考慮每個消費者實例,但是如果沒有訂閱主題,則跳過實例。當然,這樣的話分配肯定不均衡。
什麽意思呢?也就是說,消費者組是一個邏輯概念,同組意味著同一時刻分區只能被一個消費者實例消費,換句話說,同組意味著一個分區只能分配給組中的一個消費者。事實上,同組也可以不同訂閱,這就是說雖然屬於同一個組,但是它們訂閱的主題可以是不一樣的。
例如,假設有3個主題t0,t1,t2;其中,t0有1個分區p0,t1有2個分區p0和p1,t2有3個分區p0,p1和p2;有3個消費者C0,C1和C2;C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1和t2。那麽,按照輪詢分配的話,C0應該負責
首先,肯定是輪詢的方式,其次,比如說有主題t0,t1,t2,它們分別有1,2,3個分區,也就是t0有1個分區,t1有2個分區,t2有3個分區;有3個消費者分別從屬於3個組,C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1,t2;那麽,按照輪詢分配的話,C0應該負責t0p0,C1應該負責t1p0,其余均由C2負責。
上述過程用圖形表示大概是這樣的:
為什麽最後的結果是
C0: [t0p0]
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]
這是因為,按照輪詢t0p1由C0負責,t1p0由C1負責,由於同組,C2只能負責t1p1,由於只有C2訂閱了t2,所以t2所有分區由C2負責,綜合起來就是這個結果
細想一下可以發現,這種情況下跟range分配的結果是一樣的
5.? 測試代碼
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cjs.example</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<description></description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
?
package com.cjs.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class HelloProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.133:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("abc", Integer.toString(i), Integer.toString(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e) {
e.printStackTrace();
}else {
System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());
}
}
});
}
producer.close();
}
}
package com.cjs.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class HelloConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.133:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("foo", "bar", "abc"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
Kafka分區與消費者的關系