1. 程式人生 > >kafka2.0-冪等傳送(the idempotent producer)_09

kafka2.0-冪等傳送(the idempotent producer)_09

從kafka 0.11版本開始,生成者就支援了兩種額外的傳送模式 - 冪等傳送(the idempotent producer)和事物傳送(the transactional producer),可以說這是kafka在支援EOS(exactly-once semantics)上的重要功能。事物傳送在後面講。

什麼是冪等?(如果已經瞭解,可跳過) 舉個例子:比如使用者對訂單付款之後,生成了一個付款成功的訊息,傳送給了訂單系統,訂單系統接收到訊息之後,將訂單狀態為已付款,後來,訂單系統又收到了一個發貨成功的訊息,再將訂單狀態更新為已發貨,但是由於網路或者是系統的原因,訂單系統再次收到了之前的付款成功

的訊息,也就是訊息重複了,這個在現象在實際應用中也經常出現。訂單系統的處理是,查詢資料庫,發現這個訂單狀態為已發貨,然後不再更改訂單狀態。這時候,我們可以說訂單處理訊息的介面是冪等的,如果訂單再次將狀態更新為已付款,介面就是非冪等的。

kafka的訊息重複傳送問題 在以前的kafka的老版本中,是支援訊息的同步傳送的,但是現在,kafka全部改成了非同步傳送。其具體過程是 kafkaProducer.send()方法將訊息傳送到緩衝區中,然後後臺的一個IO執行緒讀取緩衝區中的資料,將訊息傳送到對應的broker上。

我們在傳送訊息的時候,如果設定了retries的次數大於0,就可能一個訊息被重複的傳送到了broker上,並且broker也儲存了多次,具體產生過程如下: kafka的訊息重複傳送問題

具體的情況是,由於網路原因第三步ack訊息回傳的時候,客戶端沒有接收到傳送成功確認訊息,客戶端會重發。所以這就產生了訊息的生產。

如果我們設定retries等於0,那麼假如在第一步訊息就傳送失敗了,那麼訊息將無法正確的傳送到kafka叢集。

冪等傳送 如果想傳送訊息不重複,可以使用kafka的冪等傳送,這個功能早在0.11版本中就存在了。 使用冪等傳送只需要這樣設定props.put("enable.idempotence", true);,預設情況下enable.idempotencefalse,如果設定了它為trueretries的預設值將為 Integer.MAX_VALUE

acks預設為all。 開啟冪等傳送之後,其傳送過程將會如下: 這裡寫圖片描述

為了實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number

PID:當每個新的Producer在初始化的時候,會分配一個唯一的PID,這個PID對使用者是不可見的。 Sequence Numbler:(對於每個PID,該Producer傳送資料的每個<Topic, Partition>都對應一個從0開始單調遞增的Sequence Number。 Broker端在快取中儲存了這Sequence Numbler,對於接收的每條訊息,如果其序號比Broker快取中序號大於1則接受它,否則將其丟棄。這樣就可以實現了訊息重複提交了。但是,只能保證單個Producer對於同一個<Topic, Partition>的EOS。不能保證同一個Producer一個topic不同的partition冪等。

總而言之,冪等的producer只能保證在同一個session和同一個partition中支援EOS。

原始碼解讀

KafkaProducer的構造方法中初始化化了一個IO執行緒,用來發送producer放在快取中的訊息,如下:

this.sender = new Sender(logContext,
        client,
        this.metadata,
        this.accumulator,
        maxInflightRequests == 1,
        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
        acks,
        retries,
        metricsRegistry.senderMetrics,
        Time.SYSTEM,
        this.requestTimeoutMs,
        config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
        this.transactionManager,
        apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

Sender實現了Runable介面,是IO執行緒主體所在,從kafka0.11版本開始,它實現了冪等和事物,所以主要實現看Sender.run方法。

    /** sender執行緒的主體 */
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

    /**
     * Run a single iteration of sending
     *
     * @param now The current POSIX time in milliseconds
     */
    void run(long now) {
        if (transactionManager != null) {
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.
                    transactionManager.resetProducerId();

                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {
                    accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }

        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

如果是冪等傳送,就要求有一個producderID,主要看這個方法maybeWaitForProducerId();

private void maybeWaitForProducerId() {
    //如果沒有produceId並且,transactionManager沒有error那就一直自旋。
    while (!transactionManager.hasProducerId() && !transactionManager.hasError()) {
        try {
            Node node = awaitLeastLoadedNodeReady(requestTimeoutMs);
            if (node != null) {
                ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
                InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
                Errors error = initProducerIdResponse.error();
                if (error == Errors.NONE) {
                    ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
                            initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                    transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
                    return;
                } else if (error.exception() instanceof RetriableException) {
                    log.debug("Retriable error from InitProducerId response", error.message());
                } else {
                    transactionManager.transitionToFatalError(error.exception());
                    break;
                }
            } else {
                log.debug("Could not find an available broker to send InitProducerIdRequest to. " +
                        "We will back off and try again.");
            }
        } catch (UnsupportedVersionException e) {
            transactionManager.transitionToFatalError(e);
            break;
        } catch (IOException e) {
            log.debug("Broker {} disconnected while awaiting InitProducerId response", e);
        }
        log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
        time.sleep(retryBackoffMs);
        metadata.requestUpdate();
    }
}
  • awaitLeastLoadedNodeReady方法 這個方法是隨機尋找一個負載最低的broker,也就是說,獲取producerID可由任意的broker完成處理。

Kafka在zk中新引入了一個節點:/latest_producer_id_block,broker啟動時提前預分配一段PID,當前是0~999,即提前分配出1000個PID來,當PID超過了999,則目前會按照1000的步長重新分配,依次遞增,如下圖所示:

這裡寫圖片描述

broker在記憶體中還儲存了下一個待分配的PID。這樣,當broker端接收到初始化PID的請求後,它會比較下一個PID是否在當前預分配的PID範圍:若是則直接返回;否則再次預分配下一批的PID。現在我們來討論下為什麼這個請求所有broker都能響應——原因就在於叢集中所有broker啟動時都會啟動一個叫TransactionCoordinator的元件,該元件能夠執行預分配PID塊和分配PID的工作,而所有broker都使用/latest_producer_id_block節點來儲存PID塊,因此任意一個broker都能響應這個請求。

  • sendAndAwaitInitProducerIdRequest方法   這個就是傳送初始化PID請求的方法,注意當前是同步等待返回結果,即Sender執行緒會無限阻塞直到broker端返回response(當然依然會受制於request.timeout.ms引數的影響)。

      得到PID之後,Sender執行緒會呼叫RecordAccumulator.drain()提取當前可傳送的訊息,在該方法中會將PID,Seq number等資訊封裝進訊息batch中,具體程式碼參見:RecordAccumulator.drain()。一旦獲取到訊息batch後,Sender執行緒開始構建ProduceRequest請求然後傳送給broker端。至此producer端的工作就算告一段落了。

  • broker端是如何響應producer請求   實際上,broker最重要的事情就是要區別某個PID的同一個訊息batch是否重複傳送了。因此在訊息被寫入到leader底層日誌之前必須要先做一次判斷,即producer請求中的訊息batch是否已然被處理過。如果請求中包含的訊息batch與最近一次成功寫入的batch相同(即PID相同,batch起始seq numberbatch結束seq number都相同),那麼該方法便丟擲異常,然後由上層方法捕獲到該異常封裝進ProduceResponse返回。如果batch不相同,則允許此次寫入,並在寫入完成後更新這些producer資訊。

      最後再說一點:以上所說的冪等producer一直強調的是“精確處理一次”的語義,實際上冪等producer還有“不亂序”的強語義保證,只不過在0.11版本中這種不亂序主要是通過設定enable.idempotence=true時強行將max.in.flight.requests.per.connection設定成1來實現的。這種實現雖然保證了訊息不亂序,但也在某種程度上降低了producerTPS。據我所知,這個問題將在1.0.0版本中已然得到解決。