1. 程式人生 > 其它 >初識kafka 之 消費者手動提交offset

初識kafka 之 消費者手動提交offset

手動提交offset

手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(非同步提交)。

  相同點:都會將本次提交的一批資料最高的偏移量提交

  不同點

               同步提交:阻塞當前執行緒,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗);

               非同步提交:沒有失敗重試機制,故有可能提交失敗。

               • commitSync(同步提交):必須等待offset提交完畢,再去消費下一批資料。

               • commitAsync(非同步提交):傳送完提交offset請求後,就開始消費下一批資料。

關鍵字

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);   // 是否開啟自動提交offset
kafkaConsumer.commitAsync();   // 非同步提交
kafkaConsumer.commitSync();    // 同步提交

實現程式碼

package com.lzh.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties;
// kafka消費者手動提交offset public class CustomConsumer手動同步提交offset { public static void main(String[] args) { // 0 配置 Properties properties = new Properties(); // 連線到伺服器 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); // 新增消費者組groupid,必須 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); /* 自動提交配置 // 是否自動提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交 offset 的時間週期 1000ms,預設 5s properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); */ // 是否手動提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 1 建立一個消費者物件 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 2 訂閱主題 ArrayList<String> topics = new ArrayList<String>(); topics.add("Mytopic"); kafkaConsumer.subscribe(topics); // 註冊要消費的主題(可以消費多個主題) // 3 消費資料 // 一直獲取消費資料 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } // 手動提交offset // kafkaConsumer.commitAsync(); // 非同步提交 kafkaConsumer.commitSync(); // 同步提交 } } }