1. 程式人生 > 其它 >Kafka使用詳解-Consumer API(手動提交offset)-手動維護offset(事務維護)

Kafka使用詳解-Consumer API(手動提交offset)-手動維護offset(事務維護)

技術標籤:kafka消費者kafka

手動維護offset

需要依賴

    <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>5.0.8.RELEASE</version>
        </dependency>

通過事務的方式維護消費者消費的資料的offset和提交offset的統一性

package com.ln.kafka.custom;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.
TransactionAspectSupport; import java.util.Arrays; import java.util.Collection; import java.util.Properties; /** * @ProjectName: kafka * @Package: com.ln.kafka.custom * @Name:CustomOffsetCustomer * @Author:linianest * @CreateTime:2021/1/8 11:22 * @version:1.0 * @Description TODO: 手動維護offset */ public
class CustomOffsetCustomer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "1234"); // 自動提交offset,關閉預設,自動提交效率可能慢 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 預設自動提交offset的時間是5秒 // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 建立物件 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 自定義維護offset consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() { // todo 重新分配分割槽前,提交當前負責的分割槽的offset @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { System.out.println("=======回收的分割槽============"); for (TopicPartition partition : collection) { System.out.println("partition=" + partition); } } // todo 重新分配分割槽後,定位新分配的分割槽的offset @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("=======重新得到的分割槽============"); for (TopicPartition partition : partitions) { System.out.println("partition=" + partition); // todo 從offset的儲存地方獲取值,redis或者mysql Long offset = getPartitionOffset(partition); consumer.seek(partition, offset); } } }); while (true) { // 拉取資料 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord record : records) { // todo 為了避免漏消費或者重複消費的問題,開啟個事務 //=================================== // 消費資料 ConsumMessage(record); //=================================== } } } // 消費資料 @Transactional(rollbackFor = Exception.class) private static void ConsumMessage(ConsumerRecord record) { System.out.printf("topic=%d,key=%s,value=%s%n", record.topic(), record.offset(), record.value()); try{ // 自己維護offset TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); // todo 提交offset commitOffset(topicPartition, record.offset() + 1); }catch (Exception e){ //設定手動回滾 TransactionAspectSupport.currentTransactionStatus() .setRollbackOnly(); } } /** * todo 提交offset,可以將offset維護到其他裝置,如mysql或者redis中 * * @param topicPartition * @param l */ private static void commitOffset(TopicPartition topicPartition, long l) { } /** * todo 獲取分割槽offset * * @param partition * @return */ private static Long getPartitionOffset(TopicPartition partition) { return null; } // 關閉資料流 }