1. 程式人生 > 其它 >kafka冪等性原理總結

kafka冪等性原理總結

1.概述
最近和一些同學交流的時候反饋說,在面試Kafka時,被問到Kafka元件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細作答。但是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那麼,今天筆者就為大家來剖析一下Kafka的冪等性原理及實現。
2.內容
2.1 Kafka為啥需要冪等性?
Producer在生產傳送訊息時,難免會重複傳送訊息。Producer進行retry時會產生重試機制,發生訊息重複傳送。而引入冪等性後,重複傳送只會生成一條有效的訊息。Kafka作為分散式訊息系統,它的使用場景常見與分散式系統中,比如訊息推送系統、業務平臺系統(如物流平臺、銀行結算平臺等)。以銀行結算平臺來說,業務方作為上游把資料上報到銀行結算平臺,如果一份資料被計算、處理多次,那麼產生的影響會很嚴重。


2.2 影響Kafka冪等性的因素有哪些?
在使用Kafka時,需要確保Exactly-Once語義。分散式系統中,一些不可控因素有很多,比如網路、OOM、FullGC等。在Kafka Broker確認Ack時,出現網路異常、FullGC、OOM等問題時導致Ack超時,Producer會進行重複傳送。可能出現的情況如下:

2.3 Kafka的冪等性是如何實現的?
Kafka為了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什麼呢?
ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。


SequenceNumber:對於每個ProducerID,Producer傳送資料的每個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。
2.3.1 冪等性引入之前的問題?
Kafka在引入冪等性之前,Producer向Broker傳送訊息,然後Broker將訊息追加到訊息流中後給Producer返回Ack訊號值。實現流程如下:

上圖的實現流程是一種理想狀態下的訊息傳送情況,但是實際情況中,會出現各種不確定的因素,比如在Producer在傳送給Broker的時候出現網路異常。比如以下這種異常情況的出現:

上圖這種情況,當Producer第一次傳送訊息給Broker時,Broker將訊息(x2,y2)追加到了訊息流中,但是在返回Ack訊號給Producer時失敗了(比如網路異常) 。此時,Producer端觸發重試機制,將訊息(x2,y2)重新發送給Broker,Broker接收到訊息後,再次將該訊息追加到訊息流中,然後成功返回Ack訊號給Producer。這樣下來,訊息流中就被重複追加了兩條相同的(x2,y2)的訊息。


2.3.2 冪等性引入之後解決了什麼問題?
面對這樣的問題,Kafka引入了冪等性。那麼冪等性是如何解決這類重複傳送訊息的問題的呢?下面我們可以先來看看流程圖:

同樣,這是一種理想狀態下的傳送流程。實際情況下,會有很多不確定的因素,比如Broker在傳送Ack訊號給Producer時出現網路異常,導致傳送失敗。異常情況如下圖所示:

當Producer傳送訊息(x2,y2)給Broker時,Broker接收到訊息並將其追加到訊息流中。此時,Broker返回Ack訊號給Producer時,發生異常導致Producer接收Ack訊號失敗。對於Producer來說,會觸發重試機制,將訊息(x2,y2)再次傳送,但是,由於引入了冪等性,在每條訊息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber傳送給Broker,而之前Broker快取過之前傳送的相同的訊息,那麼在訊息流中的訊息就只有一條(x2,y2),不會出現重複傳送的情況。
2.3.3 ProducerID是如何生成的?
客戶端在生成Producer時,會例項化如下程式碼:

// 例項化一個Producer物件
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現程式碼如下:

private void maybeWaitForPid() {
        if (transactionState == null)
            return;

        while (!transactionState.hasPid()) {
            try {
                Node node = awaitLeastLoadedNodeReady(requestTimeout);
                if (node != null) {
                    ClientResponse response = sendAndAwaitInitPidRequest(node);
                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    } else {
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                } else {
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            } catch (Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

總結:為了實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。PID:每個新的Producer在初始化的時候會被分配一個唯一的PID,對使用者是不可見的。Sequence Numbler:對於每個PID,該Producer傳送資料的每個<Topic,Partition>都對應一個從0開始單調遞增的Sequence Number,Broker端在快取中儲存了這seq number,對於接收的每條訊息,如果其序號比Broker快取中序號大於1則接受它,否則將其丟棄。這樣就可以實現了訊息重複提交了。但是隻能保證單個Producer對於同一個<Topic,Partition>的Exactly Once語義。

 

參考部落格:
https://blog.csdn.net/querydata_boke/article/details/105419223
https://www.cnblogs.com/smartloli/p/11922639.html