【Kafka 原始碼解讀】之 【程式碼沒報錯但是訊息卻傳送失敗!】
聊聊最近,2020年,在2019年的年尾時,大家可謂對這年充滿新希望,特別是有20200202這一天。可是澳洲長達幾個月的大火,新型冠狀病毒nCoV的發現,科比的去世等等事情,讓大家感到相當的無奈,生命是如此的脆弱,明天又是如此的未知。但是人應當活在當下,勇敢的面對疫情,和大家和政府一起打贏這場沒硝煙的戰爭!
作為程式設計師,我必定不能停止工作,不能停止學習,現在在家辦公,完全配合現在提倡的隔離戰術,對自己負責,對社會負責。下面我會和大家分享一篇我之前寫的筆記,和大家一起討論關於 Kafka 的一個問題:為什麼 Kafka 傳送訊息失敗?
一、問題:訊息傳送失敗
雖然在使用資源後關閉資源是非常正常的操作,但是確實我們也是經常會缺少呼叫 close()
close()
方法確實也不會報錯或者出現問題,但是那為啥到了 Kafka 這裡,不寫就會出現問題呢?
Properties properties = new Properties(); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } //不寫導致訊息傳送失敗 //producer.close();
二、猜測
1.首先我們看一下close()方法。
註釋:此方法會一直阻塞直到之前所有的傳送請求都完成。
/** * Close this producer. This method blocks until all previously sent requests complete. * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>. * <p> * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) * will be called instead. We do this because the sender thread would otherwise try to join itself and * block forever.</strong> * <p> * * @throws InterruptException If the thread is interrupted while blocked */ @Override public void close() { close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); }
2.然後再到KafkaProducer中的註釋
生產者由一個緩衝區空間池組成,其中儲存尚未傳輸到伺服器的記錄,以及一個後臺I/O執行緒,該執行緒負責將這些記錄轉換為請求並將它們傳輸到叢集。使用後不關閉生產商將洩露這些資源。
/*
* <p>
* The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server
* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them
* to the cluster. Failure to close the producer after use will leak these resources.
* <p>
*/
3.猜測總結
到這裡我們可以稍微總結一下,KafkaProduce r傳送訊息並不是立刻往 Kafka 中傳送,而是先存在一個緩衝區裡,然後有一條後臺執行緒去不斷地讀取訊息,然後再往Kafka中傳送。我們也可以總結一下,之所以不寫 close() 方法,我們的 main() 方法中,傳送完 main() 方法就執行完了,而此時訊息可能只是剛到緩衝區中,還沒被後臺執行緒去讀取然後傳送。
三、驗證
下面我們要閱讀 KafkaProducer 的原始碼,來驗證上面的總結。
1.KafkaProducer建立
最要關注的是:建立了存放訊息的佇列,並且建立了一條後臺執行緒,主要是從佇列中獲取訊息,往 kafka 中傳送。
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Metadata metadata,
KafkaClient kafkaClient) {
try {
// ..... 省略掉很多其他程式碼,主要是關於Producer的配置,例如clientId、序列化和攔截器等等。
// 這裡就是存放訊息的佇列。
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time,
apiVersions,
transactionManager);
// .... 繼續省略無關配置
// 這個Sender實現了Runnable,是一條後臺執行緒,處理向Kafka叢集傳送生產請求的後臺執行緒
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 啟動執行緒
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
2.KafkaProducer傳送訊息
傳送訊息並不是直接就往 kafka 傳送,而是存放到我們上面提及到的佇列 accumulator。
/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* 非同步傳送訊息記錄到指定主題
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
/**
* Implementation of asynchronously send a record to a topic.
* 實現非同步傳送訊息到對應的主題
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// ... 省略了一些程式碼,主要是關於序列化、分割槽、事務、CallBack等等的配置
// 下面是往建立KafkaProducer時建立的accumulator裡新增訊息記錄。
// RecordAccumulator,private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;是存放訊息的變數。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// 下面是異常處理
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
}
// ....省略一堆異常處理
}
3.Sender傳送訊息
接下來我們得看一下後臺執行緒 Sender 是怎麼從佇列 accumulator 裡面獲取訊息記錄,然後發往 Kafka 的。
/**
* The main run loop for the sender thread
* 迴圈執行
*/
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
// 我們已停止接受請求(呼叫了close()方法),但accumulator中可能仍有請求或等待確認,那麼就wait直到這些請求完成
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 當等待時長用完,要強制關閉時,我們要讓所有未完成的批處理失敗
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
// 關閉客戶端
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
4.sendProducerData方法
傳送資料前的準備
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 獲取準備傳送資料的分割槽列表
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 處理沒有可用的leader副本的問題,強制更新元資料
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
// 移除還沒準備好傳送的節點
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// 建立請求
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// ..... 省略其他處理
// 真正傳送請求的方法
sendProduceRequests(batches, now);
return pollTimeout;
}
5.sendProduceRequest方法
最後是使用 NetworkClient 傳送資料到 Kafka 的。
/**
* Create a produce request from the given record batches
*/
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 最後是利用NetworkClient傳送的。
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
四、最後:
如果不寫 Producer.close()
,確實可能會導致訊息的傳送失敗,而註釋中也提醒了我們一定要 close
掉生產者,避免資源洩漏。而這其中最主要的原因是 KafkaProducer
實現非同步傳送的邏輯。它是先將訊息存放到RecordAccumulator
佇列中,然後讓 KafkaThread
執行緒後臺不斷地從 RecordAccumulator
中讀取已準備好傳送的訊息,最後傳送到 Kafka
中。而我們的程式碼中,如果不寫 Producer.close()
,就不會進行超時 wait
,而當 main()
方法執行完後,KafkaThread
執行緒還沒來得及從 RecordAccumulator
佇列中獲取訊息也跟著被銷燬了,所以導致訊息最後還是沒傳送成功