Kafka - 分割槽演算法
阿新 • • 發佈:2018-12-13
目錄
一 預設分割槽策略:
序列化key存在時,對其採用murmur2 hash演算法,再對總分割槽數取模。得到分割槽數。
序列化key不存在時,(輪詢,round robin)
- 可用分割槽數大於0時,用執行緒安全生成的隨機數的絕對值 對 可用分割槽數 取模,在總分割槽列表中,找到對應的分割槽數。
- 可用分割槽數等於0時,用執行緒安全生成的隨機數的絕對值 對 總分割槽數 取模,得到分割槽數。
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); }
二 自定義分割槽策略:
public class MyPartitioner implements Partitioner {
public static void main(String[] args) {
//org.apache.kafka.clients.producer.internals.DefaultPartitioner
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
/**
*由於我們按key分割槽,在這裡我們規定:key值不允許為null。在實際專案中,key為null的訊息*,可以傳送到同一個分割槽。
*/
if(keyBytes == null) {
throw new InvalidRecordException("key cannot be null");
}
if(((String)key).equals("1")) {
return 1;
}
//如果訊息的key值不為1,那麼使用hash值取模,確定分割槽。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
}
轉載:https://blog.csdn.net/wuxintdrh/article/details/78971308
//KafkaProducer設定自定義分割槽
kafkaProperties.put("partitioner.class", "自定義partitioner實現類的完全限定類名");
三 murmur2 hash演算法:
/**
* Generates 32 bit murmur2 hash from byte array
* @param data byte array to hash
* @return 32 bit hash of the given array
*/
public static int murmur2(final byte[] data) {
int length = data.length;
int seed = 0x9747b28c;
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
final int m = 0x5bd1e995;
final int r = 24;
// Initialize the hash to a random value
int h = seed ^ length;
int length4 = length / 4;
for (int i = 0; i < length4; i++) {
final int i4 = i * 4;
int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
k *= m;
k ^= k >>> r;
k *= m;
h *= m;
h ^= k;
}
// Handle the last few bytes of the input array
switch (length % 4) {
case 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16;
case 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8;
case 1:
h ^= data[length & ~3] & 0xff;
h *= m;
}
h ^= h >>> 13;
h *= m;
h ^= h >>> 15;
return h;
}