1. 程式人生 > >Kafka生產者事務和冪等

Kafka生產者事務和冪等

1 生產者冪等性

1.1 引入

冪等性引入目的:

  • 生產者重複生產訊息。生產者進行retry會產生重試時,會重複產生訊息。有了冪等性之後,在進行retry重試時,只會生成一個訊息。

1.2 冪等性實現

1.2.1 PID 和 Sequence Number

為了實現Producer的冪等性,Kafka引入了Producer ID(即PID)和Sequence Number。

  • PID。每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對使用者是不可見的。
  • Sequence Numbler。(對於每個PID,該Producer傳送資料的每個<Topic, Partition>都對應一個從0開始單調遞增的Sequence Number。

Broker端在快取中儲存了這seq number,對於接收的每條訊息,如果其序號比Broker快取中序號大於1則接受它,否則將其丟棄。這樣就可以實現了訊息重複提交了。但是,只能保證單個Producer對於同一個<Topic, Partition>的Exactly Once語義。不能保證同一個Producer一個topic不同的partion冪等。

1

實現冪等之後

2

1.2.2 生成PID的流程

在執行建立事務時,如下

1

  Producer<String, String> producer = new KafkaProducer<String, String>(props);

會建立一個Sender,並啟動執行緒,執行如下run方法,在maybeWaitForProducerId()中生成一個producerId,如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

====================================

類名:Sender

====================================

void run(long now) {

        if (transactionManager != null) {

            try {

                 ........

                if (!transactionManager.isTransactional()) {

                    // 為idempotent producer生成一個producer id

                    maybeWaitForProducerId();

                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {

                   ........

1.3 冪等性的應用例項

1、配置屬性

需要設定:

  • enable.idempotence,需要設定為ture,此時就會預設把acks設定為all,所以不需要再設定acks屬性了。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

private Producer buildIdempotProducer(){

        // create instance for properties to access producer configs

        Properties props = new Properties();

        // bootstrap.servers是Kafka叢集的IP地址。多個時,使用逗號隔開

        props.put("bootstrap.servers", "localhost:9092");

        props.put("enable.idempotence",true);

        //If the request fails, the producer can automatically retry,

        props.put("retries", 3);

        //Reduce the no of requests less than 0

        props.put("linger.ms", 1);

        //The buffer.memory controls the total amount of memory available to the producer for buffering.

        props.put("buffer.memory", 33554432);

        // Kafka訊息是以鍵值對的形式傳送,需要設定key和value型別序列化器

        props.put("key.serializer",

                "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer",

                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        return producer;

    }

2、傳送訊息

跟一般生成者一樣,如下

1

2

3

4

5

6

7

public void produceIdempotMessage(String topic, String message) {

        // 建立Producer

        Producer producer = buildIdempotProducer();

        // 傳送訊息

        producer.send(new ProducerRecord<String, String>(topic, message));

        producer.flush();

    }

此時,因為我們並沒有配置transaction.id屬性,所以不能使用事務相關API,如下

1

producer.initTransactions();

否則會出現如下錯誤:

Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.

    at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)

    at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)

    at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)

2 事務屬性

2.1 事務屬性理解

事務屬性是2017年Kafka 0.11.0.0引入的新特性。類似於資料庫事務,只是這裡的資料來源是Kafka,kafka事務屬性是指一系列的生產者生產訊息和消費者提交偏移量的操作在一個事務,或者說是是一個原子操作),同時成功或者失敗

注意:在理解訊息的事務時,一直處於一個錯誤理解就是如下程式碼中,把操作db的業務邏輯跟操作訊息當成是一個事務。其實這個是有問題的,操作DB資料庫的資料來源是DB,訊息資料來源是kfaka,這是完全不同兩個資料,一種資料來源(如mysql,kafka)對應一個事務,所以它們是兩個獨立的事務:kafka事務指kafka一系列 生產、消費訊息等操作組成一個原子操作;db事務是指操作資料庫的一系列增刪改操作組成一個原子操作。

1

2

3

4

5

6

7

8

void  kakfa_in_tranction(){

  // 1.kafa的操作:讀取訊息或者生產訊息

kafkaOperation();

   // 2.db操作

  dbOperation()

}

2.2 引入事務目的

在事務屬性之前先引入了生產者冪等性,它的作用為:

  • 生產者多次傳送訊息可以封裝成一個原子操作,要麼都成功,要麼失敗
  • consumer-transform-producer模式下,因為消費者提交偏移量出現問題,導致在重複消費訊息時,生產者重複生產訊息。需要將這個模式下消費者提交偏移量操作和生成者一系列生成訊息的操作封裝成一個原子操作。

消費者提交偏移量導致重複消費訊息的場景:消費者在消費訊息完成提交便宜量o2之前掛掉了(假設它最近提交的偏移量是o1),此時執行再均衡時,其它消費者會重複消費訊息(o1到o2之間的訊息)。

2.3 事務操作的API

producer提供了initTransactions, beginTransaction, sendOffsets, commitTransaction, abortTransaction 五個事務方法。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

/**

     * 初始化事務。需要注意的有:

     * 1、前提

     * 需要保證transation.id屬性被配置。

     * 2、這個方法執行邏輯是:

     *   (1)Ensures any transactions initiated by previous instances of the producer with the same

     *      transactional.id are completed. If the previous instance had failed with a transaction in

     *      progress, it will be aborted. If the last transaction had begun completion,

     *      but not yet finished, this method awaits its completion.

     *    (2)Gets the internal producer id and epoch, used in all future transactional

     *      messages issued by the producer.

     *

     */

    public void initTransactions();

    /**

     * 開啟事務

     */

    public void beginTransaction() throws ProducerFencedException ;

    /**

     * 為消費者提供的在事務內提交偏移量的操作

     */

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,

                                         String consumerGroupId) throws ProducerFencedException ;

    /**

     * 提交事務

     */

    public void commitTransaction() throws ProducerFencedException;

    /**

     * 放棄事務,類似回滾事務的操作

     */

    public void abortTransaction() throws ProducerFencedException ;

3 事務屬性的應用例項

在一個原子操作中,根據包含的操作型別,可以分為三種情況,前兩種情況是事務引入的場景,最後一種情況沒有使用價值。

只有Producer生產訊息;

消費訊息和生產訊息並存,這個是事務場景中最常用的情況,就是我們常說的“consume-transform-produce ”模式

只有consumer消費訊息,這種操作其實沒有什麼意義,跟使用手動提交效果一樣,而且也不是事務屬性引入的目的,所以一般不會使用這種情況

3.1 相關屬性配置

使用kafka的事務api時的一些注意事項:

  • 需要消費者的自動模式設定為false,並且不能子再手動的進行執行consumer#commitSync或者consumer#commitAsyc
  • 生產者配置transaction.id屬性
  • 生產者不需要再配置enable.idempotence,因為如果配置了transaction.id,則此時enable.idempotence會被設定為true
  • 消費者需要配置Isolation.level。在consume-trnasform-produce模式下使用事務時,必須設定為READ_COMMITTED。

3.2 只有寫

建立一個事務,在這個事務操作中,只有生成訊息操作。程式碼如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

/**

     * 在一個事務只有生產訊息操作

     */

    public void onlyProduceInTransaction() {

        Producer producer = buildProducer();

        // 1.初始化事務

        producer.initTransactions();

        // 2.開啟事務

        producer.beginTransaction();

        try {

            // 3.kafka寫操作集合

            // 3.1 do業務邏輯

            // 3.2 傳送訊息

            producer.send(new ProducerRecord<String, String>("test", "transaction-data-1"));

            producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));

            // 3.3 do其他業務邏輯,還可以傳送其他topic的訊息。

            // 4.事務提交

            producer.commitTransaction();

        } catch (Exception e) {

            // 5.放棄事務

            producer.abortTransaction();

        }

    }

建立生成者,程式碼如下,需要:

  • 配置transactional.id屬性
  • 配置enable.idempotence屬性

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

/**

     * 需要:

     * 1、設定transactional.id

     * 2、設定enable.idempotence

     * @return

     */

    private Producer buildProducer() {

        // create instance for properties to access producer configs

        Properties props = new Properties();

        // bootstrap.servers是Kafka叢集的IP地址。多個時,使用逗號隔開

        props.put("bootstrap.servers", "localhost:9092");

        // 設定事務id

        props.put("transactional.id", "first-transactional");

        // 設定冪等性

        props.put("enable.idempotence",true);

        //Set acknowledgements for producer requests.

        props.put("acks", "all");

        //If the request fails, the producer can automatically retry,

        props.put("retries", 1);

        //Specify buffer size in config,這裡不進行設定這個屬性,如果設定了,還需要執行producer.flush()來把快取中訊息傳送出去

        //props.put("batch.size", 16384);

        //Reduce the no of requests less than 0

        props.put("linger.ms", 1);

        //The buffer.memory controls the total amount of memory available to the producer for buffering.

        props.put("buffer.memory", 33554432);

        // Kafka訊息是以鍵值對的形式傳送,需要設定key和value型別序列化器

        props.put("key.serializer",

                "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer",

                "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        return producer;

    }

3.3 消費-生產並存(consume-transform-produce)

在一個事務中,既有生產訊息操作又有消費訊息操作,即常說的Consume-tansform-produce模式。如下例項程式碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

/**

     * 在一個事務內,即有生產訊息又有消費訊息

     */

    public void consumeTransferProduce() {

        // 1.構建上產者

        Producer producer = buildProducer();

        // 2.初始化事務(生成productId),對於一個生產者,只能執行一次初始化事務操作

        producer.initTransactions();

        // 3.構建消費者和訂閱主題

        Consumer consumer = buildConsumer();

        consumer.subscribe(Arrays.asList("test"));

        while (true) {

            // 4.開啟事務

            producer.beginTransaction();

            // 5.1 接受訊息

            ConsumerRecords<String, String> records = consumer.poll(500);

            try {

                // 5.2 do業務邏輯;

                System.out.println("customer Message---");

                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();

                for (ConsumerRecord<String, String> record : records) {

                    // 5.2.1 讀取訊息,並處理訊息。print the offset,key and value for the consumer records.

                    System.out.printf("offset = %d, key = %s, value = %s\n",

                            record.offset(), record.key(), record.value());

                    // 5.2.2 記錄提交的偏移量

                    commits.put(new TopicPartition(record.topic(), record.partition()),

                            new OffsetAndMetadata(record.offset()));

                    // 6.生產新的訊息。比如外賣訂單狀態的訊息,如果訂單成功,則需要傳送跟商家結轉訊息或者派送員的提成訊息

                    producer.send(new ProducerRecord<String, String>("test", "data2"));

                }

                // 7.提交偏移量

                producer.sendOffsetsToTransaction(commits, "group0323");

                // 8.事務提交

                producer.commitTransaction();

            } catch (Exception e) {

                // 7.放棄事務

                producer.abortTransaction();

            }

        }

    }

建立消費者程式碼,需要:

  • 將配置中的自動提交屬性(auto.commit)進行關閉
  • 而且在程式碼裡面也不能使用手動提交commitSync( )或者commitAsync( )
  • 設定isolation.level

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

/**

     * 需要:

     * 1、關閉自動提交 enable.auto.commit

     * 2、isolation.level為

     * @return

     */

    public Consumer buildConsumer() {

        Properties props = new Properties();

        // bootstrap.servers是Kafka叢集的IP地址。多個時,使用逗號隔開

        props.put("bootstrap.servers", "localhost:9092");

        // 消費者群組

        props.put("group.id", "group0323");

        // 設定隔離級別

        props.put("isolation.level","read_committed");

        // 關閉自動提交

        props.put("enable.auto.commit", "false");

        props.put("session.timeout.ms", "30000");

        props.put("key.deserializer",

                "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer",

                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer

                <String, String>(props);

        return consumer;

    }

3.4 只有讀

建立一個事務,在這個事務操作中,只有生成訊息操作,如下程式碼。這種操作其實沒有什麼意義,跟使用手動提交效果一樣,無法保證消費訊息操作和提交偏移量操作在一個事務。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

**

     * 在一個事務只有訊息操作

     */

    public void onlyConsumeInTransaction() {

        Producer producer = buildProducer();

        // 1.初始化事務

        producer.initTransactions();

        // 2.開啟事務

        producer.beginTransaction();

        // 3.kafka讀訊息的操作集合

        Consumer consumer = buildConsumer();

        while (true) {

            // 3.1 接受訊息

            ConsumerRecords<String, String> records = consumer.poll(500);

            try {

                // 3.2 do業務邏輯;

                System.out.println("customer Message---");

                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();

                for (ConsumerRecord<String, String> record : records) {

                    // 3.2.1 處理訊息 print the offset,key and value for the consumer records.

                    System.out.printf("offset = %d, key = %s, value = %s\n",

                            record.offset(), record.key(), record.value());

                    // 3.2.2 記錄提交偏移量

                    commits.put(new TopicPartition(record.topic(), record.partition()),

                            new OffsetAndMetadata(record.offset()));

                }

                // 4.提交偏移量

                producer.sendOffsetsToTransaction(commits, "group0323");

                // 5.事務提交

                producer.commitTransaction();

            } catch (Exception e) {

                // 6.放棄事務

                producer.abortTransaction();

            }

        }

    }

4 生產者事務的實現

4.1 相關配置

4.1.1 Broker configs

ransactional.id.timeout.ms The maximum amount of time in ms that the transaction coordinator will wait before proactively expire a producer TransactionalId without receiving any transaction status updates from it.

Default is 604800000 (7 days). This allows periodic weekly producer jobs to maintain their ids.

max.transaction.timeout.ms The maximum allowed timeout for transactions. If a client’s requested transaction time exceed this, then the broker will return a InvalidTransactionTimeout error in InitPidRequest. This prevents a client from too large of a timeout, which can stall consumers reading from topics included in the transaction.

Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.

transaction.state.log.replication.factor The number of replicas for the transaction state topic.

Default: 3

transaction.state.log.num.partitions The number of partitions for the transaction state topic.

Default: 50

transaction.state.log.min.isr The minimum number of insync replicas the each partition of the transaction state topic needs to have to be considered online.

Default: 2

transaction.state.log.segment.bytes The segment size for the transaction state topic.

Default: 104857600 bytes.

4.1.2 Producer configs

enable.idempotence

Whether or not idempotence is enabled (false by default). If disabled, the producer will not set the PID field in produce requests and the current producer delivery semantics will be in effect. Note that idempotence must be enabled in order to use transactions.

When idempotence is enabled, we enforce that acks=all, retries > 1, and max.inflight.requests.per.connection=1. Without these values for these configurations, we cannot guarantee idempotence. If these settings are not explicitly overidden by the application, the producer will set acks=all, retries=Integer.MAX_VALUE, and max.inflight.requests.per.connection=1 when idempotence is enabled.

transaction.timeout.ms The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.

This config value will be sent to the transaction coordinator along with the InitPidRequest. If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a InvalidTransactionTimeout error.

Default is 60000. This makes a transaction to not block downstream consumption more than a minute, which is generally allowable in real-time apps.

transactional.id The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery.

如果配置了transactional.id屬性,則enable.idempotence 會被設定為true.

4.1.3 Consumer configs

  • read_uncommitted,類似沒有設定事務屬性的consumer,即就是我們平常使用的consumer,只要訊息寫入到檔案中就可以進行讀取。
isolation.level Here are the possible values (default is read_uncommitted):

read_uncommitted: consume both committed and uncommitted messages in offset ordering.

read_committed: only consume non-transactional messages or committed transactional messages in offset order. In order to maintain offset ordering, this setting means that we will have to buffer messages in the consumer until we see all messages in a given transaction.

4.2  冪等性和事務性的關係

4.2.1 兩者關係

事務屬性實現前提是冪等性,即在配置事務屬性transaction id時,必須還得配置冪等性;但是冪等性是可以獨立使用的,不需要依賴事務屬性。

  • 冪等性引入了Porducer ID
  • 事務屬性引入了Transaction Id屬性。、

設定

  • enable.idempotence = true,transactional.id不設定:只支援冪等性。
  • enable.idempotence = true,transactional.id設定:支援事務屬性和冪等性
  • enable.idempotence = false,transactional.id不設定:沒有事務屬性和冪等性的kafka
  • enable.idempotence = false,transactional.id設定:無法獲取到PID,此時會報錯

4.2.2 tranaction id 、productid 和 epoch

一個app有一個tid,同一個應用的不同例項PID是一樣的,只是epoch的值不同。如:

3

同一份程式碼執行兩個例項,分步執行如下:在例項1沒有進行提交事務前,開始執行例項2的初始化事務

4

step1  例項1-初始化事務。的打印出對應productId和epoch,資訊如下:

[2018-04-21 20:56:23,106] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 123 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)

step2 例項1-傳送訊息。

step3 例項2-初始化事務。初始化事務時的打印出對應productId和epoch,資訊如下:

18-04-21 20:56:48,373] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 124 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)

step4  例項1-提交事務,此時報錯

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.

step5 例項2-提交事務

為了避免這種錯誤,同一個事務ID,只有保證如下順序epch小producer執行init-transaction和committransaction,然後epoch較大的procuder才能開始執行init-transaction和commit-transaction,如下順序:

80061024

有了transactionId後,Kafka可保證:

  • 跨Session的資料冪等傳送。當具有相同Transaction ID的新的Producer例項被建立且工作時,舊的且擁有相同Transaction ID的Producer將不再工作【上面的例項可以驗證】。kafka保證了關聯同一個事務的所有producer(一個應用有多個例項)必須按照順序初始化事務、和提交事務,否則就會有問題,這保證了同一事務ID中訊息是有序的(不同例項得按順序建立事務和提交事務)。

4.3 事務最佳實踐-單例項的事務性

通過上面例項中可以看到kafka是跨Session的資料冪等傳送,即如果應用部署多個例項時常會遇到上面的問題“org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.”,必須保證這些例項生成者的提交事務順序和建立順序保持一致才可以,否則就無法成功。其實,在實踐中,我們更多的是如何實現對應用單例項的事務性。可以通過spring-kafaka實現思路來學習,即每次建立生成者都設定一個不同的transactionId的值,如下程式碼:

在spring-kafka中,對於一個執行緒建立一個producer,事務提交之後,還會關閉這個producer並清除,後續同一個執行緒或者新的執行緒重新執行事務時,此時就會重新建立producer。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

====================================

類名:ProducerFactoryUtils

====================================

/**

     * Obtain a Producer that is synchronized with the current transaction, if any.

     * @param producerFactory the ConnectionFactory to obtain a Channel for

     * @param <K> the key type.

     * @param <V> the value type.

     * @return the resource holder.

     */

    public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(

            final ProducerFactory<K, V> producerFactory) {

        Assert.notNull(producerFactory, "ProducerFactory must not be null");

        // 1.對於每一個執行緒會生成一個唯一key,然後根據key去查詢resourceHolder

        @SuppressWarnings("unchecked")

        KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager

                .getResource(producerFactory);

        if (resourceHolder == null) {

            // 2.建立一個消費者

            Producer<K, V> producer = producerFactory.createProducer();

            // 3.開啟事務

            producer.beginTransaction();

            resourceHolder = new KafkaResourceHolder<K, V>(producer);

            bindResourceToTransaction(resourceHolder, producerFactory);

        }

        return resourceHolder;

    }

建立消費者程式碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

====================================

類名:DefaultKafkaProducerFactory

====================================

protected Producer<K, V> createTransactionalProducer() {

Producer<K, V> producer = this.cache.poll();

if (producer == null) {

Map<String, Object> configs = new HashMap<>(this.configs);

            // 對於每一次生成producer時,都設定一個不同的transactionId

configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,

this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());

producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);

            // 1.初始化話事務。

producer.initTransactions();

return new CloseSafeProducer<K, V>(producer, this.cache);

}

else {

return producer;

}

}

4.4  Consume-transform-Produce 的流程

Snip20180504_56

流程1 :查詢Tranaction Corordinator。

Producer向任意一個brokers傳送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址。

流程2:初始化事務 initTransaction

Producer傳送InitpidRequest給事務協調器,獲取一個Pid。InitpidRequest的處理過程是同步阻塞的,一旦該呼叫正確返回,Producer就可以開始新的事務。TranactionalId通過InitpidRequest傳送給Tranciton Corordinator,然後在Tranaciton Log中記錄這<TranacionalId,pid>的對映關係。除了返回PID之外,還具有如下功能:

  • 對PID對應的epoch進行遞增,這樣可以保證同一個app的不同例項對應的PID是一樣的,但是epoch是不同的。
  • 回滾之前的Producer未完成的事務(如果有)。

流程3: 開始事務beginTransaction

執行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀態為開始狀態。

注意:這個操作並沒有通知Transaction Coordinator。

流程4: Consume-transform-produce loop

流程4.0: 通過Consumtor消費訊息,處理業務邏輯

流程4.1: producer向TransactionCordinantro傳送AddPartitionsToTxnRequest

在producer執行send操作時,如果是第一次給<topic,partion>傳送資料,此時會向Trasaction Corrdinator傳送一個AddPartitionsToTxnRequest請求,Transaction Corrdinator會在transaction log中記錄下tranasactionId和<topic,partion>一個對映關係,並將狀態改為begin。AddPartionsToTxnRequest的資料結構如下:

1

2

3

4

5

6

AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]

TransactionalId => string

PID => int64

Epoch => int16

Topic => string

Partition => int32

流程4.2:  producer#send傳送 ProduceRequst

生產者傳送資料,雖然沒有還沒有執行commit或者absrot,但是此時訊息已經儲存到kafka上,可以參考如下圖斷點位置處,此時已經可以檢視到訊息了,而且即使後面執行abort,訊息也不會刪除,只是更改狀態欄位標識訊息為abort狀態。

62059279

流程4.3: AddOffsetCommitsToTxnRequest

Producer通過KafkaProducer.sendOffsetsToTransaction 向事務協調器器傳送一個AddOffesetCommitsToTxnRequests:

1

2

3

4

5

AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID

TransactionalId => string

PID => int64

Epoch => int16

ConsumerGroupID => string

在執行事務提交時,可以根據ConsumerGroupID來推斷_customer_offsets主題中相應的TopicPartions資訊。這樣在

流程4.4: TxnOffsetCommitRequest

Producer通過KafkaProducer.sendOffsetsToTransaction還會向消費者協調器Cosumer Corrdinator傳送一個TxnOffsetCommitRequest,在主題_consumer_offsets中儲存消費者的偏移量資訊。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

TxnOffsetCommitRequest   => ConsumerGroupID

                            PID

                            Epoch

                            RetentionTime

                            OffsetAndMetadata

  ConsumerGroupID => string

  PID => int64

  Epoch => int32

  RetentionTime => int64

  OffsetAndMetadata => [TopicName [Partition Offset Metadata]]

    TopicName => string

    Partition => int32

    Offset => int64

    Metadata => string

流程5: 事務提交和事務終結(放棄事務)

通過生產者的commitTransaction或abortTransaction方法來提交事務和終結事務,這兩個操作都會發送一個EndTxnRequest給Transaction Coordinator。

流程5.1:EndTxnRequest。Producer傳送一個EndTxnRequest給Transaction Coordinator,然後執行如下操作:

  • Transaction Coordinator會把PREPARE_COMMIT or PREPARE_ABORT 訊息寫入到transaction log中記錄
  • 執行流程5.2
  • 執行流程5.3

流程5.2:WriteTxnMarkerRequest

1

2

3

4

5

6

7

WriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker [Topic [Partition]]]

CoordinatorEpoch => int32

PID => int64

Epoch => int16

Marker => boolean (false(0) means ABORT, true(1) means COMMIT)

Topic => string

Partition => int32

  • 對於Producer生產的訊息。Tranaction Coordinator會發送WriteTxnMarkerRequest給當前事務涉及到每個<topic,partion>的leader,leader收到請求後,會寫入一個COMMIT(PID) 或者 ABORT(PID)的控制資訊到data log中
  • 對於消費者偏移量資訊,如果在這個事務裡面包含_consumer-offsets主題。Tranaction Coordinator會發送WriteTxnMarkerRequest給Transaction Coordinartor,Transaction Coordinartor收到請求後,會寫入一個COMMIT(PID) 或者 ABORT(PID)的控制資訊到 data log中。

流程5.3:Transaction Coordinator會將最終的COMPLETE_COMMIT或COMPLETE_ABORT訊息寫入Transaction Log中以標明該事務結束。

  • 只會保留這個事務對應的PID和timstamp。然後把當前事務其他相關訊息刪除掉,包括PID和tranactionId的對映關係。

4.4.1 檔案型別和檢視命令

kafka檔案主要包括broker的data(主題:test)、事務協調器對應的transaction_log(主題:__tranaction_state)、偏移量資訊(主題:_consumer_offsets)三種類型。如下圖

1

這三種檔案型別其實都是topic的分割槽,所以對於每一個目錄都包含*.log、*.index、*.timeindex、*.txnindex檔案(僅這個檔案是為了實現事務屬性引入的)。segment和segmengt對應index、timeindex、txnindex檔案命名中序號表示的是第幾個訊息。如下圖中,00000000000000368769.index和00000000000000568769.log中“368969”就是表示檔案中儲存的第一個訊息是468969個訊息。

對於索引文案包含兩部分:

  • baseOffset:索引對應segment檔案中的第幾條message。
  • position:在segment中的絕對位置。

67930538

檢視檔案內容:

bin/kafka-run-class.sh   kafka.tools.DumpLogSegments   –files /Users/wuzhonghu/data/kafka-logs/firtstopic-0/00000000000000000002.log   –print-data-log

4.4.2 ControlMessage和Transaction markers

Trasaction markers就是kafka為了實現事務定義的Controll Message。這個訊息和資料訊息都存放在log中,在Consumer讀取事務訊息時有用,可以參考下面章節-4.5.1 老版本-讀取事務訊息順序。

4.4.3 Transaction Coordinator 和 Transaction Log

Transaction Log如下放置在“_tranaction_state”主題下面,預設是50個分割槽,每一個分割槽中檔案格式和broker儲存訊息是一樣的,都有log/index/timeindex檔案,如下:

57646045

4.5 消費讀取事務訊息(READ_COMMITED)

Consumer為了實現事務,新增了一個isolation.level配置,有兩個值如下,

  • READ_UNCOMMITTED,類似於沒有事務屬性的消費者。
  • READ_COMMITED,只獲取執行了事務提交的訊息。

在本小節中我們主要講READ_COMMITED模式下讀取訊息的流程的兩種版本的演化

4.5.1 老版本-讀取事務訊息順序

如下圖中,按順序儲存到broker中訊息有:事務1訊息T1-M1、對於事務2的訊息有T2-M1、事務1訊息T1-M2、非事務訊息M1,最終到達client端的循序是M1-> T2-M1  -> T1-M1 -> T1-M2。

84999567

具體步驟如下:

  • step1 Consumer接受到事務訊息T1-M1、T2-M2、T1-M2和非事務訊息M1,因為沒有收到事務T1和T2的控制訊息,所以此時把事務相關訊息T1-M1、T2-M2、T1-M2 儲存到記憶體,然後只把非事務訊息M1返回給client。
  • step2  Consumer接受到事務2的控制訊息T2-C,此時就把事務訊息T2-M1傳送給Clinet。
  • step3   C onsumer接受到事務1的控制訊息T1-C,此時就把事務訊息T1-M1和T1-M2傳送給Client

4.5.2 新版本-讀取事務訊息順序

第一種方式,需要在consumer客戶端快取訊息,當存在耗時比較長的事務時,佔用客戶端大量的記憶體資源。為了解決這個問題,通過LSO和Abort Index 檔案來解決這個問題,參考:

(1) LSO,Last stable offset。Broker在快取中維護了所有處於執行狀態的事務對應的initial offsets,LSO的值就是這些offsets中最小值-1。這樣在LSO之前資料都是已經commit或者abort的資料,只有這些資料才對Consumer可見,即consumer讀取資料只能讀取到LSO的位置。

  • LSO並沒有持久化某一個位置,而是實時計算出來的,並儲存在快取中。

(2)Absort Index檔案

Conusmer傳送FetchRequest中,新增了Isolation欄位,表示是那種模式

1

2

3

4

5

6

7

8

9

10

ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]

  ReplicaId => int32

  MaxWaitTime => int32

  MinBytes => int32

  TopicName => string

  Partition => int32

  FetchOffset => int64

  MaxBytes => int32

  Isolation => READ_COMMITTED | READ_UNCOMMITTED

返回資料型別為FetchResponse的格式為:

ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset AbortedTransactions MessageSetSize MessageSet]]

對應各個給欄位型別為

1

2

3

4

5

6

7

8

9

ThrottleTime => int32

  TopicName => string

  Partition => int32

  ErrorCode => int16

  HighwaterMarkOffset => int64

  AbortedTransactions => [PID FirstOffset]

    PID => int64

    FirstOffset => int64

  MessageSetSize => int32

  • 設定成 READ_UNCOMMITTED 模式時, the AbortedTransactions array is null.
  • 設定為READ_COMMITTED時,the Last Stable Offset(LSO),當事務提交之後,LSO向前移動offset

資料如下:

  • 存放資料的log

1

  • 存放Absort Index的內容如下:

3

執行讀取資料流程如下:

step1: 假設consumer讀取資料的fetched offsets的區間是0到4。

  • 首先,broker讀取data log中資料

 11

  • 然後,broker依次讀取abort index的內容,發現LSO大於等於 4 就停止。如上可以獲取到P2對應的offset從2到5的訊息都是被丟棄的:

    12        

  • 最後,broker將上面data log和abort index中滿足條件的資料返回給consumer。

step2 :在consumer端根據absrot index中返回的內容,過濾丟棄的訊息,最終給使用者訊息為

13

4.5.3 Absorted Transaction Index

在broker中資料中新增一個索引檔案,儲存aborted tranasation對應的offsets,只有事務執行abort時,才會往這個檔案新增一個記錄,初始這個檔案是不存在的,只有第一條abort 時,才會建立這個檔案。

2

這個索引檔案結構的每一行結構是TransactionEntry:

1

2

3

4

5

Version => int16

PID => int64

FirstOffset => int64

LastOffset => int64

LastStableOffset => int64

當broker接受到控制訊息(producer執行commitTransaction()或者abortTransaction())時, 執行如下操作:

(1)計算LSO。

Broker在快取中維護了所有處於執行狀態的事務對應的initial offsets,LSO的值就是這些offsets中最小值-1。

舉例說明下LSO的計算,對於一個data log中內如如下

31

對應的abort index檔案中內如如下:LSO是遞增的

32

(2)第二步   如果事務是提交狀態,則在索引檔案中新增TransactionEntry。

(3)第三步   從active的tranaction set中移除這個transaton,然後更新LSO。

4.5.3  問題

1、問題1:producer通過事務提交訊息時拋異常了, 對於使用非事務的消費者,是否可以獲取此訊息?

對於事務訊息,必須是執行commit或者abstort之後,訊息才對消費者可見,即使是非事務的消費者。只是非事務消費者相比事務消費者區別,在於可以讀取執行了absort的訊息。

5 其他思考

1、如何保證訊息不丟。

(1)在消費端可以建立一個日誌表,和業務處理在一個事務

定時掃描沒有表傳送沒有被處理的訊息

(2)消費端,消費訊息之後,修改訊息表的中訊息狀態為已處理成功。

2、如何保證訊息提交和業務處理在同一個事務內完成

在消費端可以建立一個日誌表,和業務處理在一個事務

3、消費者角度,如何保證訊息不被重複消費。

(1)通過seek操作

(2)通過kafka事務操作。

4、生產者角度,如何保證訊息不重複生產

(1)kakfka冪等性