1. 程式人生 > >11. kafka重試機制解讀

11. kafka重試機制解讀

前面對kafka的學習中已經瞭解到KafkaProducer通過設定引數retries,如果傳送訊息到broker時丟擲異常,且是允許重試的異常,那麼就會最大重試retries引數指定的次數。

本片文章主要分析幾個問題:
- 哪些異常可以重試
- 如何實現重試

接下來通過分析一一解開這些問題的答案。

1.哪些異常可以重試

org.apache.kafka.clients.producer.internals.Sender類中有如下方法:

private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
    return
batch.attempts() < this.retries && ((response.error.exception() instanceof RetriableException) || (transactionManager != null && transactionManager.canRetry(response, batch))); }

通過方法名可知,其作用是判斷是否能重試,由方法體內的實現可知,允許重試需要滿足兩個條件:
1. 重試次數少於引數retries指定的值;
2. 異常是RetriableException

型別或者TransactionManager允許重試;

transactionManager.canRetry()後面會分析;先看看哪些異常是RetriableException型別異常。

  • RetriableException型別異常

kafka對RetriableException異常註釋是:短暫性的通過重試可以成功的異常;通過RetriableException類關係圖可知,可重試異常有圖中RetriableException的子類那些異常(可以通過異常是否繼承自RetriableException判斷是否可重試異常):

retries exception list

  • TransactionManager允許重試

如果異常不屬於RetriableException型別,但是隻要滿足(transactionManager != null && transactionManager.canRetry(response, batch))

就允許重試,所以,首先需要滿足transactionManager不為null。transactionManager是在KafkaProducer中構造Sender傳入的。構造TransactionManager的核心原始碼如下:

private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {
    TransactionManager transactionManager = null;
    boolean userConfiguredIdempotence = false;
    // 使用者設定的Properties引數中是否有'enable.idempotence',如果有的話, 就用使用者配置的
    if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
        userConfiguredIdempotence = true;
    }
    // 使用者設定的Properties引數中是否有'transactional.id',如果有的話, 就用使用者配置的
    boolean userConfiguredTransactions = false;
    if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
        userConfiguredTransactions = true;
    }
    // 得到引數'enable.idempotence'的值
    boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
    // 如果使用者顯示配置enable.idempotence為false,並且又配置了transactional.id,就會丟擲這個異常
    if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) {
        throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
    } 
    // 如果使用者配置了transactional.id,那麼idempotenceEnabled就認為是true(與)
    if (userConfiguredTransactions) {
        idempotenceEnabled = true;
    }

    // 只有使用者配置了transactional.id,且enable.idempotence沒有設定為false,這裡才為true,就會構造一個有效的TransactionManager;從這裡可知,如果使用者沒有配置transactional.id,那麼TransactionManager為null
    if (idempotenceEnabled) {
        // 構造TransactionManager的幾個重要引數
        String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
        int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);
        ... ...
    }

    return transactionManager;
}

根據上面原始碼分析可知,只要使用者配置了transactional.id,且沒有顯示配置enable.idempotence為false,那麼TransactionManager就不會為null;

接下來還要滿足transactionManager.canRetry(response, batch)才允許重試,主要包括下面幾種情況:
- 碰到OutOfOrderSequenceException異常
- broker的響應報文中沒有logStartOffset(正常的響應資訊:”T0-0” -> “{error: NONE,offset: 0,logAppendTime: -1, logStartOffset: 0}”)

2.如何實現重試

上面說明了什麼情況下允許重試,接下來分析kafka是如何實現重試的。

2.1原理圖

本打算把原理圖放在最後,但是最後還是決定放在前面。對重試機制有一定的瞭解後,再看後面的分析就容易很多。kafka傳送&重試機制如下圖所示:
訊息傳送&重試機制.png
說明:
1. new KafkaProducer()後建立一個後臺執行緒KafkaThread掃描RecordAccumulator中是否有訊息;
2. 呼叫KafkaProducer.send()傳送訊息,實際上只是把訊息儲存到RecordAccumulator中;
3. 後臺執行緒KafkaThread掃描到RecordAccumulator中有訊息後,將訊息傳送到kafka叢集;
4. 如果傳送成功,那麼返回成功;
5. 如果傳送失敗,那麼判斷是否允許重試。如果不允許重試,那麼返回失敗的結果;如果允許重試,把訊息再儲存到RecordAccumulator中,等待後臺執行緒KafkaThread掃描再次傳送;

初步瞭解整個傳送&重試過程後,再根據原始碼進行更深入的分析。

2.2後臺執行緒

分析kafka如何實現重試之前,先看一下發送訊息到broker前做的主要事情:

  1. 構造KafkaProducer時,構造Send並啟動一個非同步執行緒:
this.sender = new Sender(... ...);
String ioThreadName = "kafka-producer-network-thread" + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

且從這段程式碼可知,每個KafkaProducer會啟動一個執行緒處理訊息,這個執行緒命名為:kafka-producer-network-thread | ${clientId}。

筆者某個例項檢視KafkaProducer啟動的執行緒結果如下:

[afei@kafka ~]$ jstack -l 23715 | grep "kafka-producer-network-thread"
"kafka-producer-network-thread | producer-2" #109 daemon prio=5 os_prio=0 tid=0x00007fe081921000 nid=0x5dcb runnable [0x00007fdfeb92b000]
"kafka-producer-network-thread | producer-1" #46 daemon prio=5 os_prio=0 tid=0x00007fe081f5a800 nid=0x5d66 runnable [0x00007fe024d20000]
  1. 呼叫KafkaProducer的send()方法時,先把傳送的訊息儲存在accumulator中:
RecordAccumulator.RecordAppendResult result = accumulator.append(
tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);

2.3**RecordAccumulator**

RecordAccumulator是儲存需要傳送的訊息或者重試訊息的核心。傳送訊息之前先把訊息存放在這裡,非同步執行緒KafkaThread啟動後從這裡取訊息然後傳送到broker。當傳送出錯且允許重試時,又會把這些需要重試的訊息儲存到這裡再進行重試。

當呼叫KafkaProducer的send()方法傳送訊息時,會呼叫append()方法將訊息暫時存放,核心原始碼如下:

 // 獲得deque或者建立deque。因為核心資料結構是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,所以生產者批次訊息是按照分割槽區分的。如果根據分割槽拿不到deque的話,就建立一個deque。
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 把需要傳送的訊息放入佇列中,
dq.addLast(batch);

當傳送出錯且允許重試時,會呼叫reenqueue()方法將訊息暫時存放,核心原始碼如下:

public void reenqueue(ProducerBatch batch, long now) {
    Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
    synchronized (deque) {
        // 把需要重試的訊息放入佇列中,等到重試
        deque.addFirst(batch);

    }
}

RecordAccumulator簡單總結:通過這兩段程式碼的分析可知,儲存需要傳送的(重試)訊息的核心資料結構是Deque。且建立佇列時是new ArrayDeque(),沒有指定初始容量。這裡不打算深入分析Deque,只是簡單介紹一下,Deque是Double ended queue (雙端佇列) 的縮寫。首尾都可寫入可讀取。

2.3傳送&重試

下面分析kafka是如何傳送並如何重試的。(TransactionManager相關程式碼被省略,其的作用後面有機會單獨一篇文章分析);傳送訊息核心程式碼在Sender.java中, Sender.java實現了Runnable介面, 所以是後臺執行緒非同步傳送訊息到kafka叢集:

public class Sender implements Runnable {

    public void run() {
        // KafkaProducer傳送訊息的執行緒啟動後,一直執行,直到KafkaProducer.close()將running置為false
        while (running) {
            run(time.milliseconds());
        }

        // 根據日誌可知,接下來是KafkaProducer關閉後的邏輯
        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // 當非強制關閉時,可能依然有請求堆積在accumulator中, 我們需要將這些剩餘的請求處理完成
        while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
            run(time.milliseconds());
        }

        if (forceClose) {
            // 如果強制關閉,且有未處理完的訊息,那麼讓這些訊息的傳送失敗,並丟擲異常new IllegalStateException("Producer is closed forcefully.").
            log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }
        ... ...
    }
}

KafkaProducer關閉有方式有兩種:close();close(long timeout, TimeUnit timeUnit),第一種是友好的關閉且設定timeout為Long.MAX_VALUE,第二種如果設定timeout為0,就是強制關閉,即forceClose=true。
備註:drained: 流乾,耗盡,undrained則表示未耗盡。

準備傳送訊息前需要嘗試去accumulator中獲取訊息:

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
        this.maxRequestSize, now);

accumulator.drain()本質就是:Deque<ProducerBatch> deque = getDeque(tp);ProducerBatch batch = deque.pollFirst();,即根據分割槽資訊得到Deque,然後不斷獲取ProducerBatch,即封裝後的要傳送的訊息。

run(long)方法中往broker傳送訊息的部分核心程式碼(位於Sender.java中)如下:

private void sendProduceRequest(... ...){
    ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
            produceRecordsByPartition, transactionalId);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            // 這裡是處理響應訊息的地方
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    // 省略傳送訊息到broker的程式碼
    ... ...
}   

handleProduceResponse()中收到的響應,如何是網路斷開,那麼構造響應:new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION)。如果有版本不匹配問題,那麼構造響應:new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION)。還有一種特殊情況,如果指定了acks=0,那麼構造響應new ProduceResponse.PartitionResponse(Errors.NONE),因為這種情況下只需要傳送即可,不需要響應結果。接下來呼叫下面的方法–完成或者重試請求:

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) {
    Errors error = response.error;
    if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
            (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
        log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", ...);
        // 如果是'MESSAGE_TOO_LARGE'的錯誤,且是批量訊息(recordCount>1),那麼切割訊息後再發送
        this.accumulator.splitAndReenqueue(batch);
        this.accumulator.deallocate(batch);
        this.sensors.recordBatchSplit();
    } else if (error != Errors.NONE) {
        // 如果響應有錯誤,判斷是否允許重試
        if (canRetry(batch, response)) {
            // 如果允許重試,會輸出warn日誌
            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}"... ...);
            if (transactionManager == null) {
                // 重新把訊息放到佇列中
                reenqueueBatch(batch, now);
            } 
            ... ...
        } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
            // 接收到這種錯誤,就認為返回成功。
            completeBatch(batch, response);
        } else {
            ... ...
        }
        // 到這裡如果是UnknownTopicOrPartitionException異常,說明producer快取的元資料資訊可能已經過期,所以需要請求更新,程式碼省略

    } else {
        completeBatch(batch, response);
    }
    ... ...
}

如果需要重試,重新入佇列的原始碼如下:

// ProducerBatch就是傳送的訊息
private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
    // accumulator的reenqueue前面已經分析了,本質就是呼叫Deque的addFirst()
    this.accumulator.reenqueue(batch, currentTimeMs);
    this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
}