Kafka 分區分配計算(分區器 Partitions )
Producer<String,String> producer = new KafkaProducer<String,String>(properties); String message = "kafka producer demo"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message); try { producer.send(producerRecord).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
沒錯,ProducerRecord只是一個封裝了消息的對象而已,ProducerRecord一共有5個成員變量,即:
private final String topic;//所要發送的topic private final Integer partition;//指定的partition序號 private final Headers headers;//一組鍵值對,與RabbitMQ中的headers類似,kafka0.11.x版本才引入的一個屬性 private final K key;//消息的key private final V value;//消息的value,即消息體 private final Long timestamp;//消息的時間戳,可以分為Create_Time和LogAppend_Time之分,這個以後的文章中再表。123456
在KafkaProducer的源碼(1.0.0)中,計算分區時調用的是下面的partition()方法:
/** * computes partition for given record. * if the record has partition returns the value otherwise * calls configured partitioner class to compute the partition. */ private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
可以看出的確是先判斷有無指明ProducerRecord的partition字段,如果沒有指明,則再進一步計算分區。上面這段代碼中的partitioner在默認情況下是指Kafka默認實現的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法實現如下:
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
由上源碼可以看出partition的計算方式:
- 如果key為null,則按照一種輪詢的方式來計算分區分配
- 如果key不為null則使用稱之為murmur的Hash算法(非加密型Hash函數,具備高運算性能及低碰撞率)來計算分區分配。
KafkaProducer中還支持自定義分區分配方式,與org.apache.kafka.clients.producer.internals.DefaultPartitioner一樣首先實現org.apache.kafka.clients.producer.Partitioner接口,然後在KafkaProducer的配置中指定partitioner.class為對應的自定義分區器(Partitioners)即可,即:
properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");
自定義DemoPartitioner主要是實現Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的計算方式,詳細參考如下:
public class DemoPartitioner implements Partitioner {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public void configure(Map<String, ?> configs) {}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes || keyBytes.length<1) {
return atomicInteger.getAndIncrement() % numPartitions;
}
//借用String的hashCode的計算方式
int hash = 0;
for (byte b : keyBytes) {
hash = 31 * hash + b;
}
return hash % numPartitions;
}
@Override
public void close() {}
}
這個自定義分區器的實現比較簡單,讀者可以根據自身業務的需求來靈活實現分配分區的計算方式,比如:一般大型電商都有多個倉庫,可以將倉庫的名稱或者ID作為Key來靈活的記錄商品信息。
Kafka 分區分配計算(分區器 Partitions )