Kafka分割槽機制介紹與示例
Kafka中可以將Topic從物理上劃分成一個或多個分割槽(Partition),每個分割槽在物理上對應一個資料夾,以”topicName_partitionIndex”的命名方式命名,該資料夾下儲存這個分割槽的所有訊息(.log)和索引檔案(.index),這使得Kafka的吞吐率可以水平擴充套件。
生產者在生產資料的時候,可以為每條訊息指定Key,這樣訊息被髮送到broker時,會根據分割槽規則選擇被儲存到哪一個分割槽中,如果分割槽規則設定的合理,那麼所有的訊息將會被均勻的分佈到不同的分割槽中,這樣就實現了負載均衡和水平擴充套件。另外,在消費者端,同一個消費組可以多執行緒併發的從多個分割槽中同時消費資料(後續將介紹這塊)。
上面所說的分割槽規則,是實現了kafka.producer.Partitioner介面的類,可以自定義。比如,下面的程式碼SimplePartitioner中,將訊息的key做了hashcode,然後和分割槽數(numPartitions)做模運算,使得每一個key都可以分佈到一個分割槽中:
- package com.lxw1234.kafka;
- import kafka.producer.Partitioner;
- import kafka.utils.VerifiableProperties;
- public class SimplePartitioner implements Partitioner {
- public SimplePartitioner (VerifiableProperties props) {
- }
- @Override
- public int partition(Object key, int numPartitions) {
- int partition = 0;
- String k = (String)key;
- partition = Math.abs(k.hashCode()) % numPartitions;
- return partition;
- }
- }
在建立Topic時候可以使用–partitions <numPartitions>指定分割槽數。也可以在server.properties配置檔案中配置引數num.partitions來指定預設的分割槽數。
但有一點需要注意,為Topic建立分割槽時,分割槽數最好是broker數量的整數倍,這樣才能是一個Topic的分割槽均勻的分佈在整個Kafka叢集中,假設我的Kafka叢集由4個broker組成,以下圖為例:
建立帶分割槽的Topic
現在建立一個topic “lxw1234”,為該topic指定4個分割槽,那麼這4個分割槽將會在每個broker上各分佈一個:
- ./kafka-topics.sh
- --create
- --zookeeper zk1:2181,zk2:2181,zk3:2181
- --replication-factor 1
- --partitions 4
- --topic lxw1234
這樣所有的分割槽就均勻分佈在叢集中,如果建立topic時候指定了3個分割槽,那麼就有一個broker上沒有該topic的分割槽。
帶分割槽規則的生產者
現在用一個生產者示例(PartitionerProducer),向Topic lxw1234中傳送訊息。該生產者使用的分割槽規則,就是上面的SimplePartitioner。從0-10一共11條訊息,每條訊息的key為”key”+index,訊息內容為”key”+index+”–value”+index。比如:key0–value0、key1–value1、、、key10–value10。
- package com.lxw1234.kafka;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- public class PartitionerProducer {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("metadata.broker.list", "127.0.0.17:9091,127.0.0.17:9092,127.0.0.102:9091,127.0.0.102:9092");
- props.put("partitioner.class", "com.lxw1234.kafka.SimplePartitioner");
- Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
- String topic = "lxw1234";
- for(int i=0; i<=10; i++) {
- String k = "key" + i;
- String v = k + "--value" + i;
- producer.send(new KeyedMessage<String, String>(topic,k,v));
- }
- producer.close();
- }
- }
理論上來說,生產者在傳送訊息的時候,會按照SimplePartitioner的規則,將key0做hashcode,然後和分割槽數(4)做模運算,得到分割槽索引:
hashcode(”key0”) % 4 = 1
hashcode(”key1”) % 4 = 2
hashcode(”key2”) % 4 = 3
hashcode(”key3”) % 4 = 0
……
對應的訊息將會被髮送至相應的分割槽中。
統計各分割槽訊息的消費者
下面的消費者程式碼用來驗證,在消費資料時,打印出訊息所在的分割槽及訊息內容:
- package com.lxw1234.kafka;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import kafka.consumer.Consumer;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- import kafka.message.MessageAndMetadata;
- public class MyConsumer {
- public static void main(String[] args) {
- String topic = "lxw1234";
- ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, new Integer(1));
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while(it.hasNext()) {
- MessageAndMetadata<byte[], byte[]> mam = it.next();
- System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] ..");
- }
- }
- private static ConsumerConfig createConsumerConfig() {
- Properties props = new Properties();
- props.put("group.id","group1");
- props.put("zookeeper.connect","127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183");
- props.put("zookeeper.session.timeout.ms", "400");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- props.put("auto.offset.reset", "smallest");
- return new ConsumerConfig(props);
- }
- }
執行程式驗證結果
先啟動消費者,再執行生產者。
之後在消費者的控制檯可以看到如下輸出:
結果和正常預期一致。