阿新 • • 發佈:2019-09-06
當我們使用kafka向指定Topic傳送訊息時,如果該Topic具有多個partition,無論消費者有多少,最終都會保證一個partition內的訊息只會被一個Consumer group中的一個Consumer消費,也就是說同一Consumer group中的多個Consumer自動會起到負載均衡的效果。
下面我們就針對呼叫kafka API傳送訊息到Topic時partition的分配策略,分析下其內部具體的原始碼碼實現。
首先看下kafka API中訊息體ProducerRecord類的建構函式,可以看到構造訊息時可指定該訊息要傳送的Topic、partition、key、value等關鍵資訊。
/** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents * @param headers The headers that will be included in the record */ public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) { this(topic, partition, null, key, value, headers); } /** * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, Integer partition, K key, V value) { this(topic, partition, null, key, value, null); } /** * Create a record to be sent to Kafka * * @param topic The topic the record will be appended to * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value, null); }
producer.send(new ProducerRecord<Object, Object>(topic, key, data));
public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<String, ?> configs) {} /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //獲取該topic的分割槽列表 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); //如果key值為null if (keyBytes == null) { //維護一個key為topic的ConcurrentHashMap,並通過CAS操作的方式對value值執行遞增+1操作 int nextValue = nextValue(topic); //獲取該topic的可用分割槽列表 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) {//如果可用分割槽大於0 //執行求餘操作,保證訊息落在可用分割槽上 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // 沒有可用分割槽的話,就給出一個不可用分割槽 return Utils.toPositive(nextValue) % numPartitions; } } else { // 通過計算key的hash,確定訊息分割槽 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { //獲取一個AtomicInteger物件 AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) {//如果為空 //生成一個隨機數 counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); //維護到topicCounterMap中 AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } //返回值並執行遞增 return counter.getAndIncrement(); } public void close() {} }
/** * 自定義實現Partitioner介面 * */ public class KeyPartitioner implements Partitioner { /** * 實現具體分發策略 */ @Override public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//拉取可用的partition if (key == null||key.equals("")) { int random = (int) (Math.random() * 10); int part = random % availablePartitions.size(); return availablePartitions.get(part).partition(); } return Math.abs(key.toString().hashCode() % 6); } @Override public void configure(Map<String, ?> configs) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } }
Properties properties = new Properties(); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,KeyPartitioner.class); //加入自定義的配置 producer = new KafkaProducer<Object, Object>(properties);