關於高並發下kafka producer send異步發送耗時問題的分析
最近開發網關服務的過程當中,需要用到kafka轉發消息與保存日誌,在進行壓測的過程中由於是多線程並發操作kafka producer 進行異步send,發現send耗時有時會達到幾十毫秒的阻塞,很大程度上上影響了並發的性能,而在後續的測試中發現單線程發送反而比多線程發送效率高出幾倍。所以就對kafka API send 的源碼進行了一下跟蹤和分析,在此總結記錄一下。
首先看springboot下 kafka producer 的使用
在config中進行配置,向IOC容器中註入DefaultKafkaProducerFactory生產者工廠的實例
@Bean publicProducerFactory<Object, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); }
創建producer
this.producer = producerFactory.createProducer();
大家都知道springboot下IOC容器管理的實例默認都是單例模式;而DefaultKafkaProducerFactory本身也是一個單例工廠
@Overridepublic Producer<K, V> createProducer() { if (this.transactionIdPrefix != null) { return createTransactionalProducer(); } if (this.producer == null) { synchronized (this) { if (this.producer == null) { this.producer = new CloseSafeProducer<K, V>(createKafkaProducer()); } } } return this.producer; }
我們創建的producer也是個單例。
接下來就是具體的發送,用過kafka的小夥伴都知道producer.send是個異步操作,會返回一個Future<RecordMetadata> 類型的結果。那麽為什麽單線程和多線程send效率會較大的差距呢,我們進入KafkaProducer內部看下producer.send的具體源碼實現來找下答案
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { //保證主題的元數據可用 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { //序列化key serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can‘t convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { //序列化Value serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can‘t convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } //計算出具體的partition int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both ‘callback‘ and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); //向隊列容器中添加數據 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
這裏除了前面做的一些序列化操作和判斷,最關鍵的就是向隊列容器中執行添加數據操作
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
accumulator是RecordAccumulator這個類的一個實例,RecordAccumulator類是一個隊列容器類;它的內部維護了一個ConcurrentMap,每一個TopicPartition都對應一個專屬的消息隊列。
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
我們進入accumulator.append內部看下具體的實現
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { //根據TopicPartition拿到對應的批處理隊列 Deque<ProducerBatch> dq = getOrCreateDeque(tp); //同步隊列,保證線程安全 synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //把序列化後的數據放入隊列,並返回結果 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } // we don‘t have an in-progress record batch try to allocate a new batch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn‘t happen often... return appendResult; } MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); // Don‘t deallocate this buffer in the finally block as it‘s being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
在getOrCreateDeque中我們根據TopicPartition從ConcurrentMap獲取對應隊列,沒有的話就初始化一個。
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) { Deque<ProducerBatch> d = this.batches.get(tp); if (d != null) return d; d = new ArrayDeque<>(); Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous; }
更關鍵的是為了保證並發時的線程安全,執行 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq)時,Deque<ProducerBatch>必然需要同步處理。
synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; }
在這裏我們可以看出,多線程高並發情況下,dq會處在比較大的資源競爭,雖然是基於內存的操作,每個線程持有鎖的時間極短,但相比單線程情況,高並發情況下線程開辟較多,鎖競爭和cpu上下文切換都比較頻繁,會造成一定的性能損耗,產生阻塞耗時。
分析到這裏你就會發現,其實KafkaProducer這個異步發送是建立在生產者和消費者模式上的,send的真正操作並不是直接異步發送,而是把數據放在一個中間隊列中。那麽既然有生產者在往內存隊列中放入數據,那麽必然會有一個專有的線程負責把這些數據真正發送出去。我們通過監控jvm線程信息可以看到,KafkaProducer創建後確實會啟動一個守護線程用於消息的發送。
OK,我們再回到 KafkaProducer中,會看到裏面有這樣兩個對象,Sender就是kafka發送數據的後臺線程
private final Sender sender; private final Thread ioThread;
在KafkaProducer的構造函數中會啟動Sender線程
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
進入Sender內部可以看到這個線程的作用就是一直輪詢發送數據。
public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { try { run(time.milliseconds()); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } /** * Run a single iteration of sending * * @param now The current POSIX time in milliseconds */ void run(long now) { if (transactionManager != null) { try { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError(new KafkaException("The client hasn‘t received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn‘t safe to continue.")); } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retryBackoffMs, now); return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } long pollTimeout = sendProducerData(now); client.poll(pollTimeout, now); }
通過上面的分析我們可以看出producer.send操作本身其實是個基於內存的存儲操作,耗時幾乎可以忽略不計,但由於高並發情況下,線程同步會有一定的性能損耗,當然這個損耗在一般的應用場景下幾乎是可以忽略不計的,但如果是數據量比較大,高並發的場景下會比較明顯。
針對上面的問題分析,這裏說下我個人的一些總結:
1、首先避免多線程操作producer發送數據,你可以采用生產者消費者模式把producer.send從你的多線程操作中解耦出來,維護一個你要發送的消息隊列,單獨開辟一個線程操作;
2、可能有的小夥伴會問,那麽多創建幾個producer的實例或者維護一個producer池可以嗎,我原本也是這個想法,只是在測試中發現效果也不是很理想,我估計是由於創建producer實例過多,導致線程數量也跟著增加,本身的業務線程再加上kafka的線程,線程上下文切換比較頻繁,CPU資源壓力比較大,效率也不如單線程操作;
3、這個問題其實真是針對API操作來講的,send操作並不是真正的數據發送,真正的數據發送由守護線程進行;按照kafka本身的設計思想,如果操作本身就成為了你性能的瓶頸,你應該考慮的是集群部署,負載均衡;
4、無鎖才是真正的高性能;
關於高並發下kafka producer send異步發送耗時問題的分析