KafKa生產者-分割槽
生產者(producer)採用推(push)模式將訊息釋出到broker,每條訊息都被追加(append)到分割槽(patition)中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障kafka吞吐率)。訊息傳送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分割槽日誌)組成,其組織結構如下圖所示:
可以看出,每個Partition中的訊息都是有序的,生產的訊息被不斷追加到Partition log上,其中的每一個訊息都被賦予了一個唯一的offset值。每個分割槽內部也獨立地維護了一個從0開始的offset值,offset只保證區內有序,即生產順序和消費順序一致。
1、分割槽的原因
(1)方便在叢集中擴充套件,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料了;
(2)可以提高併發度,因為可以以Partition為單位讀寫了。
2、分割槽的方式
生產者向broker傳送資料,需要將要傳送的資料封裝成一個ProducerRecord物件,ProducerRecord這個類含有多個過載的構造方法,每一種構造方法都有不同的引數,也就代表了不同的分割槽方法。
通過觀察原始碼發現,這幾種構造方法最終執行的都是上圖的第一個構造方法,只不過在各自方法內部呼叫時,自身沒有包含的引數在呼叫第一個構造方法的時候都設定為了null,這些過載的含有不同引數的構造方法呼叫第一個方法,第一個方法會根據傳進來的引數值為成員變數賦值,沒有值的則賦值為null,我們來看一下第一個構造方法的原始碼:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null."); if (timestamp != null && timestamp < 0) thrownew IllegalArgumentException( String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); if (partition != null && partition < 0) throw new IllegalArgumentException( String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers); }
由程式碼可知,該方法對ProducerRecord物件做一個初始化的處理,將各個引數賦值給對應的成員變數,其中有的成員變數可能會被賦值為null。這些資訊既有訊息的的元資料,也有實際要釋出的資料。會通過一個生產者物件,呼叫send方法傳送。下面是KafkaProducer的send方法的原始碼:
send方法是通過非同步的方式將record傳送到主題的,其最終呼叫的也是第二個send方法
/** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); } …… @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
觀察第二個send方法發現,它會先判斷是否有攔截器物件,如果沒有就直接返回doSend方法的返回值,如果有攔截器物件則會去執行攔截器物件的onSend方法。這裡繼續觀察探索doSend方法,這個方法是非同步傳送訊息的主題的具體實現:
可看出該方法會依次將key和value進行序列化操作,然後計算一下分割槽,然後根據record的主題和partition函式返回的分割槽構建一個TopicPartition物件,最終這個物件會和時間戳資訊、序列化之後的key、value等資訊一起追加到accumulator這個執行緒共享變數中,等待sender執行緒將accumulator中的訊息傳送給broker
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { …… byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch(){ …… } }
進一步分析partition函式,這個函式用於計算給定的record的分割槽,如果record的分割槽給定了,那麼就直接返回給定的分割槽值,如果沒有則會呼叫已經配置好的分割槽類去計算分割槽。
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
直接進一步分析分割槽類是如何計算分割槽的,原始碼如下:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
可以看出,計算分割槽的規則如下:
(1)給定分割槽值的時候,直接將指明的分割槽值作為partition值;
(2)沒有指明partition值,但是有key,則會將key的hash值與主題的分割槽數進行取模運算,得到partition值;
(3)既沒有partition值,也沒有key,則在第一次呼叫時隨機生成一個整數(後面每次呼叫在這個整數上自增),將這個值與 topic 可用的 partition 總數取餘得到 partition 值,也就是常說的 round-robin 演算法。
需要說明的是,Kafka的Producer傳送訊息採用的是非同步傳送的方式。在訊息傳送的過程中,涉及到了兩個執行緒 main執行緒和Sender執行緒 ,以及一個執行緒共享變數 RecordAccumulator。main執行緒將訊息傳送給 RecordAccumulator,Sender執行緒不斷從RecordAccumulator中拉取訊息傳送到 Kafka broker。
這也就對應了上面為何會將主題、分割槽、頭資訊、時間戳新增到RecordAccumulator變數。