1. 程式人生 > 實用技巧 >KafKa生產者-分割槽

KafKa生產者-分割槽

  生產者(producer)採用推(push)模式將訊息釋出到broker,每條訊息都被追加(append)到分割槽(patition)中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障kafka吞吐率)。訊息傳送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分割槽日誌)組成,其組織結構如下圖所示:

  

  可以看出,每個Partition中的訊息都是有序的,生產的訊息被不斷追加到Partition log上,其中的每一個訊息都被賦予了一個唯一的offset值。每個分割槽內部也獨立地維護了一個從0開始的offset值,offset只保證區內有序,即生產順序和消費順序一致。

  1、分割槽的原因

    (1)方便在叢集中擴充套件,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料了;

    (2)可以提高併發度,因為可以以Partition為單位讀寫了。

  2、分割槽的方式

    生產者向broker傳送資料,需要將要傳送的資料封裝成一個ProducerRecord物件,ProducerRecord這個類含有多個過載的構造方法,每一種構造方法都有不同的引數,也就代表了不同的分割槽方法。

    通過觀察原始碼發現,這幾種構造方法最終執行的都是上圖的第一個構造方法,只不過在各自方法內部呼叫時,自身沒有包含的引數在呼叫第一個構造方法的時候都設定為了null,這些過載的含有不同引數的構造方法呼叫第一個方法,第一個方法會根據傳進來的引數值為成員變數賦值,沒有值的則賦值為null,我們來看一下第一個構造方法的原始碼:

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw
new IllegalArgumentException( String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); if (partition != null && partition < 0) throw new IllegalArgumentException( String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers); }

  由程式碼可知,該方法對ProducerRecord物件做一個初始化的處理,將各個引數賦值給對應的成員變數,其中有的成員變數可能會被賦值為null。這些資訊既有訊息的的元資料,也有實際要釋出的資料。會通過一個生產者物件,呼叫send方法傳送。下面是KafkaProducer的send方法的原始碼:

  send方法是通過非同步的方式將record傳送到主題的,其最終呼叫的也是第二個send方法

    /**
     * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
     * See {@link #send(ProducerRecord, Callback)} for details.
     */
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return send(record, null);
    }

    ……

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

  觀察第二個send方法發現,它會先判斷是否有攔截器物件,如果沒有就直接返回doSend方法的返回值,如果有攔截器物件則會去執行攔截器物件的onSend方法。這裡繼續觀察探索doSend方法,這個方法是非同步傳送訊息的主題的具體實現:

  可看出該方法會依次將key和value進行序列化操作,然後計算一下分割槽,然後根據record的主題和partition函式返回的分割槽構建一個TopicPartition物件,最終這個物件會和時間戳資訊、序列化之後的key、value等資訊一起追加到accumulator這個執行緒共享變數中,等待sender執行緒將accumulator中的訊息傳送給broker

 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
         ……
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer");
            }
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer");
            }
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch(){
         ……
    }
    }        

  進一步分析partition函式,這個函式用於計算給定的record的分割槽,如果record的分割槽給定了,那麼就直接返回給定的分割槽值,如果沒有則會呼叫已經配置好的分割槽類去計算分割槽。

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

  直接進一步分析分割槽類是如何計算分割槽的,原始碼如下:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

  可以看出,計算分割槽的規則如下:

  (1)給定分割槽值的時候,直接將指明的分割槽值作為partition值;

  (2)沒有指明partition值,但是有key,則會將key的hash值與主題的分割槽數進行取模運算,得到partition值;

  (3)既沒有partition值,也沒有key,則在第一次呼叫時隨機生成一個整數(後面每次呼叫在這個整數上自增),將這個值與 topic 可用的 partition 總數取餘得到 partition 值,也就是常說的 round-robin 演算法。

  需要說明的是,Kafka的Producer傳送訊息採用的是非同步傳送的方式。在訊息傳送的過程中,涉及到了兩個執行緒 main執行緒和Sender執行緒 ,以及一個執行緒共享變數 RecordAccumulator。main執行緒將訊息傳送給 RecordAccumulator,Sender執行緒不斷從RecordAccumulator中拉取訊息傳送到 Kafka broker。

  

  這也就對應了上面為何會將主題、分割槽、頭資訊、時間戳新增到RecordAccumulator變數。