Kafka使用詳解-Consumer API(手動提交offset)-手動維護offset(事務維護)
阿新 • • 發佈:2021-01-09
手動維護offset
需要依賴
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.0.8.RELEASE</version>
</dependency>
通過事務的方式維護消費者消費的資料的offset和提交offset的統一性
package com.ln.kafka.custom;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor. TransactionAspectSupport;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
/**
* @ProjectName: kafka
* @Package: com.ln.kafka.custom
* @Name:CustomOffsetCustomer
* @Author:linianest
* @CreateTime:2021/1/8 11:22
* @version:1.0
* @Description TODO: 手動維護offset
*/
public class CustomOffsetCustomer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1234");
// 自動提交offset,關閉預設,自動提交效率可能慢
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 預設自動提交offset的時間是5秒
// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
// 建立物件
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 自定義維護offset
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
// todo 重新分配分割槽前,提交當前負責的分割槽的offset
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
System.out.println("=======回收的分割槽============");
for (TopicPartition partition : collection) {
System.out.println("partition=" + partition);
}
}
// todo 重新分配分割槽後,定位新分配的分割槽的offset
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("=======重新得到的分割槽============");
for (TopicPartition partition : partitions) {
System.out.println("partition=" + partition);
// todo 從offset的儲存地方獲取值,redis或者mysql
Long offset = getPartitionOffset(partition);
consumer.seek(partition, offset);
}
}
});
while (true) {
// 拉取資料
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord record : records) {
// todo 為了避免漏消費或者重複消費的問題,開啟個事務
//===================================
// 消費資料
ConsumMessage(record);
//===================================
}
}
}
// 消費資料
@Transactional(rollbackFor = Exception.class)
private static void ConsumMessage(ConsumerRecord record) {
System.out.printf("topic=%d,key=%s,value=%s%n", record.topic(), record.offset(), record.value());
try{
// 自己維護offset
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
// todo 提交offset
commitOffset(topicPartition, record.offset() + 1);
}catch (Exception e){
//設定手動回滾
TransactionAspectSupport.currentTransactionStatus()
.setRollbackOnly();
}
}
/**
* todo 提交offset,可以將offset維護到其他裝置,如mysql或者redis中
*
* @param topicPartition
* @param l
*/
private static void commitOffset(TopicPartition topicPartition, long l) {
}
/**
* todo 獲取分割槽offset
*
* @param partition
* @return
*/
private static Long getPartitionOffset(TopicPartition partition) {
return null;
}
// 關閉資料流
}