11. kafka重試機制解讀
前面對kafka的學習中已經瞭解到KafkaProducer通過設定引數retries
,如果傳送訊息到broker時丟擲異常,且是允許重試的異常,那麼就會最大重試retries引數指定的次數。
本片文章主要分析幾個問題:
- 哪些異常可以重試
- 如何實現重試
接下來通過分析一一解開這些問題的答案。
1.哪些異常可以重試
org.apache.kafka.clients.producer.internals.Sender類中有如下方法:
private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
return batch.attempts() < this.retries &&
((response.error.exception() instanceof RetriableException) ||
(transactionManager != null && transactionManager.canRetry(response, batch)));
}
通過方法名可知,其作用是判斷是否能重試,由方法體內的實現可知,允許重試需要滿足兩個條件:
1. 重試次數少於引數retries
指定的值;
2. 異常是RetriableException
transactionManager.canRetry()後面會分析;先看看哪些異常是RetriableException型別異常。
- RetriableException型別異常
kafka對RetriableException異常註釋是:短暫性的通過重試可以成功的異常;通過RetriableException類關係圖可知,可重試異常有圖中RetriableException的子類那些異常(可以通過異常是否繼承自RetriableException判斷是否可重試異常):
- TransactionManager允許重試
如果異常不屬於RetriableException型別,但是隻要滿足(transactionManager != null && transactionManager.canRetry(response, batch))
private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
TransactionManager transactionManager = null;
boolean userConfiguredIdempotence = false;
// 使用者設定的Properties引數中是否有'enable.idempotence',如果有的話, 就用使用者配置的
if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
userConfiguredIdempotence = true;
}
// 使用者設定的Properties引數中是否有'transactional.id',如果有的話, 就用使用者配置的
boolean userConfiguredTransactions = false;
if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
userConfiguredTransactions = true;
}
// 得到引數'enable.idempotence'的值
boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
// 如果使用者顯示配置enable.idempotence為false,並且又配置了transactional.id,就會丟擲這個異常
if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}
// 如果使用者配置了transactional.id,那麼idempotenceEnabled就認為是true(與)
if (userConfiguredTransactions) {
idempotenceEnabled = true;
}
// 只有使用者配置了transactional.id,且enable.idempotence沒有設定為false,這裡才為true,就會構造一個有效的TransactionManager;從這裡可知,如果使用者沒有配置transactional.id,那麼TransactionManager為null
if (idempotenceEnabled) {
// 構造TransactionManager的幾個重要引數
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
... ...
}
return transactionManager;
}
根據上面原始碼分析可知,只要使用者配置了transactional.id,且沒有顯示配置enable.idempotence為false,那麼TransactionManager就不會為null;
接下來還要滿足transactionManager.canRetry(response, batch)
才允許重試,主要包括下面幾種情況:
- 碰到OutOfOrderSequenceException異常
- broker的響應報文中沒有logStartOffset(正常的響應資訊:”T0-0” -> “{error: NONE,offset: 0,logAppendTime: -1, logStartOffset: 0}”)
2.如何實現重試
上面說明了什麼情況下允許重試,接下來分析kafka是如何實現重試的。
2.1原理圖
本打算把原理圖放在最後,但是最後還是決定放在前面。對重試機制有一定的瞭解後,再看後面的分析就容易很多。kafka傳送&重試機制如下圖所示:
說明:
1. new KafkaProducer()後建立一個後臺執行緒KafkaThread掃描RecordAccumulator中是否有訊息;
2. 呼叫KafkaProducer.send()傳送訊息,實際上只是把訊息儲存到RecordAccumulator中;
3. 後臺執行緒KafkaThread掃描到RecordAccumulator中有訊息後,將訊息傳送到kafka叢集;
4. 如果傳送成功,那麼返回成功;
5. 如果傳送失敗,那麼判斷是否允許重試。如果不允許重試,那麼返回失敗的結果;如果允許重試,把訊息再儲存到RecordAccumulator中,等待後臺執行緒KafkaThread掃描再次傳送;
初步瞭解整個傳送&重試過程後,再根據原始碼進行更深入的分析。
2.2後臺執行緒
分析kafka如何實現重試之前,先看一下發送訊息到broker前做的主要事情:
- 構造KafkaProducer時,構造Send並啟動一個非同步執行緒:
this.sender = new Sender(... ...);
String ioThreadName = "kafka-producer-network-thread" + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
且從這段程式碼可知,每個KafkaProducer會啟動一個執行緒處理訊息,這個執行緒命名為:kafka-producer-network-thread | ${clientId}。
筆者某個例項檢視KafkaProducer啟動的執行緒結果如下:
[afei@kafka ~]$ jstack -l 23715 | grep "kafka-producer-network-thread"
"kafka-producer-network-thread | producer-2" #109 daemon prio=5 os_prio=0 tid=0x00007fe081921000 nid=0x5dcb runnable [0x00007fdfeb92b000]
"kafka-producer-network-thread | producer-1" #46 daemon prio=5 os_prio=0 tid=0x00007fe081f5a800 nid=0x5d66 runnable [0x00007fe024d20000]
- 呼叫KafkaProducer的send()方法時,先把傳送的訊息儲存在accumulator中:
RecordAccumulator.RecordAppendResult result = accumulator.append(
tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
2.3**RecordAccumulator**
RecordAccumulator是儲存需要傳送的訊息或者重試訊息的核心。傳送訊息之前先把訊息存放在這裡,非同步執行緒KafkaThread啟動後從這裡取訊息然後傳送到broker。當傳送出錯且允許重試時,又會把這些需要重試的訊息儲存到這裡再進行重試。
當呼叫KafkaProducer的send()方法傳送訊息時,會呼叫append()方法將訊息暫時存放,核心原始碼如下:
// 獲得deque或者建立deque。因為核心資料結構是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,所以生產者批次訊息是按照分割槽區分的。如果根據分割槽拿不到deque的話,就建立一個deque。
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 把需要傳送的訊息放入佇列中,
dq.addLast(batch);
當傳送出錯且允許重試時,會呼叫reenqueue()方法將訊息暫時存放,核心原始碼如下:
public void reenqueue(ProducerBatch batch, long now) {
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
// 把需要重試的訊息放入佇列中,等到重試
deque.addFirst(batch);
}
}
RecordAccumulator簡單總結:通過這兩段程式碼的分析可知,儲存需要傳送的(重試)訊息的核心資料結構是Deque。且建立佇列時是new ArrayDeque()
,沒有指定初始容量。這裡不打算深入分析Deque,只是簡單介紹一下,Deque是Double ended queue (雙端佇列) 的縮寫。首尾都可寫入可讀取。
2.3傳送&重試
下面分析kafka是如何傳送並如何重試的。(TransactionManager相關程式碼被省略,其的作用後面有機會單獨一篇文章分析);傳送訊息核心程式碼在Sender.java中, Sender.java實現了Runnable介面, 所以是後臺執行緒非同步傳送訊息到kafka叢集:
public class Sender implements Runnable {
public void run() {
// KafkaProducer傳送訊息的執行緒啟動後,一直執行,直到KafkaProducer.close()將running置為false
while (running) {
run(time.milliseconds());
}
// 根據日誌可知,接下來是KafkaProducer關閉後的邏輯
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// 當非強制關閉時,可能依然有請求堆積在accumulator中, 我們需要將這些剩餘的請求處理完成
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
run(time.milliseconds());
}
if (forceClose) {
// 如果強制關閉,且有未處理完的訊息,那麼讓這些訊息的傳送失敗,並丟擲異常new IllegalStateException("Producer is closed forcefully.").
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
... ...
}
}
KafkaProducer關閉有方式有兩種:
close();
和close(long timeout, TimeUnit timeUnit)
,第一種是友好的關閉且設定timeout為Long.MAX_VALUE
,第二種如果設定timeout為0,就是強制關閉,即forceClose=true。
備註:drained: 流乾,耗盡,undrained則表示未耗盡。
準備傳送訊息前需要嘗試去accumulator中獲取訊息:
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
accumulator.drain()本質就是:
Deque<ProducerBatch> deque = getDeque(tp);ProducerBatch batch = deque.pollFirst();
,即根據分割槽資訊得到Deque,然後不斷獲取ProducerBatch,即封裝後的要傳送的訊息。
run(long)方法中往broker傳送訊息的部分核心程式碼(位於Sender.java中)如下:
private void sendProduceRequest(... ...){
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
// 這裡是處理響應訊息的地方
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
// 省略傳送訊息到broker的程式碼
... ...
}
handleProduceResponse()中收到的響應,如何是網路斷開,那麼構造響應:new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION)
。如果有版本不匹配問題,那麼構造響應:new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION)
。還有一種特殊情況,如果指定了acks=0
,那麼構造響應new ProduceResponse.PartitionResponse(Errors.NONE)
,因為這種情況下只需要傳送即可,不需要響應結果。接下來呼叫下面的方法–完成或者重試請求:
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) {
Errors error = response.error;
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", ...);
// 如果是'MESSAGE_TOO_LARGE'的錯誤,且是批量訊息(recordCount>1),那麼切割訊息後再發送
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
// 如果響應有錯誤,判斷是否允許重試
if (canRetry(batch, response)) {
// 如果允許重試,會輸出warn日誌
log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}"... ...);
if (transactionManager == null) {
// 重新把訊息放到佇列中
reenqueueBatch(batch, now);
}
... ...
} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
// 接收到這種錯誤,就認為返回成功。
completeBatch(batch, response);
} else {
... ...
}
// 到這裡如果是UnknownTopicOrPartitionException異常,說明producer快取的元資料資訊可能已經過期,所以需要請求更新,程式碼省略
} else {
completeBatch(batch, response);
}
... ...
}
如果需要重試,重新入佇列的原始碼如下:
// ProducerBatch就是傳送的訊息
private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
// accumulator的reenqueue前面已經分析了,本質就是呼叫Deque的addFirst()
this.accumulator.reenqueue(batch, currentTimeMs);
this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
}