Kafka的消費者提交方式手動同步提交、和非同步提交
1、Kafka的消費者提交方式
1)、自動提交,這種方式讓消費者來管理位移,應用本身不需要顯式操作。當我們將enable.auto.commit設定為true,那麼消費者會在poll方法呼叫後每隔五秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一樣,自動提交也是由poll方法來驅動的,在呼叫poll方法的時候,消費者判斷是否到達提交時間,如果是則提交上一次poll返回的最大位移。需要注意的是,這種方式可能會導致訊息重複消費,假如,某個消費者poll訊息後,應用正在處理訊息,在3秒後kafka進行了重平衡,那麼由於沒有更新位移導致重平衡後這部分訊息重複消費。
2)、同步提交。
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Collections; 6 import java.util.List; 7 import java.util.Properties; 8 import java.util.regex.Pattern; 9 10 import org.apache.kafka.clients.consumer.ConsumerConfig;11 import org.apache.kafka.clients.consumer.ConsumerRecord; 12 import org.apache.kafka.clients.consumer.ConsumerRecords; 13 import org.apache.kafka.clients.consumer.KafkaConsumer; 14 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 15 import org.apache.kafka.clients.producer.ProducerConfig;16 import org.apache.kafka.common.TopicPartition; 17 import org.apache.kafka.common.serialization.StringDeserializer; 18 19 public class KafkaConsumerSimple { 20 21 // 設定伺服器地址 22 private static final String bootstrapServer = "192.168.110.142:9092"; 23 24 // 設定主題 25 private static final String topic = "topic-demo"; 26 27 // 設定主題 28 private static final String topic2 = "topic-demo2"; 29 30 // 設定消費者組 31 private static final String groupId = "group.demo"; 32 33 public static void main(String[] args) { 34 Properties properties = new Properties(); 35 // 設定反序列化key引數資訊 36 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 37 // 設定反序列化value引數資訊 38 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 39 40 // 設定伺服器列表資訊,必填引數,該引數和生產者相同,,制定連結kafka叢集所需的broker地址清單,可以設定一個或者多個 41 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 42 43 // 設定消費者組資訊,消費者隸屬的消費組,預設為空,如果設定為空,則會丟擲異常,這個引數要設定成具有一定業務含義的名稱 44 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 45 46 // 制定kafka消費者對應的客戶端id,預設為空,如果不設定kafka消費者會自動生成一個非空字串。 47 properties.put("client.id", "consumer.client.id.demo"); 48 49 // 設定每次從最早的offset開始消費 50 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 51 52 // 手動提交開啟 53 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 54 55 // 將引數設定到消費者引數中 56 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 57 58 // 訊息訂閱 59 // consumer.subscribe(Collections.singletonList(topic)); 60 // 可以訂閱多個主題 61 // consumer.subscribe(Arrays.asList(topic, topic2)); 62 // 可以使用正則表示式進行訂閱 63 // consumer.subscribe(Pattern.compile("topic-demo*")); 64 65 // 指定訂閱的分割槽 66 TopicPartition topicPartition = new TopicPartition(topic, 0); 67 consumer.assign(Arrays.asList(topicPartition)); 68 69 // 初始化offset位移為-1 70 long lastConsumeOffset = -1; 71 while (true) { 72 // 每隔一秒監聽一次,拉去指定主題分割槽的訊息 73 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 74 if (records.isEmpty()) { 75 break; 76 } 77 // 獲取到訊息 78 List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition); 79 // 獲取到訊息的offset位移資訊,最後消費的位移 80 lastConsumeOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); 81 // System.out.println("the last offset is " + lastConsumeOffset); 82 // 同步提交消費位移 83 consumer.commitSync(); 84 } 85 // 當前消費者最後一個消費的位置 86 System.out.println("consumed offset is " + lastConsumeOffset); 87 // 提交,下次消費從哪個位置開始 88 OffsetAndMetadata committed = consumer.committed(topicPartition); 89 System.out.println("committed offset is " + committed.offset()); 90 // 下次消費從哪個位置開始 91 long position = consumer.position(topicPartition); 92 System.out.println("the offset of the next record is " + position); 93 94 } 95 96 }
3)、非同步提交方式。手動提交有一個缺點,就是當發起提交時呼叫應用會阻塞。當然我們可以減少手動提交的頻率,但這個會增加訊息重複的概率(和自動提交一樣)。另外一個解決方法是,使用非同步提交。但是非同步提交也有一個缺點,那就是如果伺服器返回提交失敗,非同步提交不會進行重試。相比較起來,同步提交會進行重試知道成功或者最後丟擲異常給應用。非同步提交沒有實現重試是因為,如果同時存在多個非同步提交,進行重試可能會導致位移覆蓋。比如,我們發起一個非同步提交commitA,此時提交位移是2000,隨後又發起了一個非同步提交commitB且位移為3000,commitA提交失敗但commitB提交失敗,此時commitA進行重試併成功的話,會將實際上已經提交的位移從3000回滾到2000,導致訊息重複消費。
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Map; 6 import java.util.Properties; 7 import java.util.concurrent.atomic.AtomicBoolean; 8 9 import org.apache.kafka.clients.consumer.ConsumerConfig; 10 import org.apache.kafka.clients.consumer.ConsumerRecord; 11 import org.apache.kafka.clients.consumer.ConsumerRecords; 12 import org.apache.kafka.clients.consumer.KafkaConsumer; 13 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 14 import org.apache.kafka.clients.consumer.OffsetCommitCallback; 15 import org.apache.kafka.common.TopicPartition; 16 import org.apache.kafka.common.serialization.StringDeserializer; 17 18 public class KafkaConsumerAsyncSimple { 19 20 private static AtomicBoolean running = new AtomicBoolean(true); 21 22 // 設定伺服器地址 23 private static final String bootstrapServer = "192.168.110.142:9092"; 24 25 // 設定主題 26 private static final String topic = "topic-demo"; 27 28 // 設定消費者組 29 private static final String groupId = "group.demo"; 30 31 public static void main(String[] args) { 32 Properties properties = new Properties(); 33 // 設定反序列化key引數資訊 34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 35 // 設定反序列化value引數資訊 36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 37 38 // 設定伺服器列表資訊,必填引數,該引數和生產者相同,,制定連結kafka叢集所需的broker地址清單,可以設定一個或者多個 39 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 40 41 // 設定消費者組資訊,消費者隸屬的消費組,預設為空,如果設定為空,則會丟擲異常,這個引數要設定成具有一定業務含義的名稱 42 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 43 44 // 制定kafka消費者對應的客戶端id,預設為空,如果不設定kafka消費者會自動生成一個非空字串。 45 properties.put("client.id", "consumer.client.id.demo"); 46 47 // 設定每次從最早的offset開始消費 48 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 49 50 // 將引數設定到消費者引數中 51 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 52 // 訂閱主題 53 consumer.subscribe(Arrays.asList(topic)); 54 55 try { 56 while (running.get()) { 57 // 每隔一秒監聽一次,拉去指定主題分割槽的訊息 58 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 59 if (records.isEmpty()) { 60 break; 61 } 62 for (ConsumerRecord<String, String> record : records) { 63 System.out.println("我要開始消費了: " + record.toString()); 64 } 65 66 // 非同步回撥,適合訊息量非常大,但是允許訊息重複的 67 consumer.commitAsync(new OffsetCommitCallback() { 68 69 @Override 70 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { 71 if (exception == null) { 72 System.out.println("非同步回撥成功了,offset : " + offsets); 73 } else { 74 System.err.println("fail to commit offsets " + offsets + " , " + exception); 75 } 76 77 } 78 }); 79 80 } 81 } finally { 82 // 關閉客戶端 83 consumer.close(); 84 } 85 86 } 87 88 }
2、指定位移消費,seek方法提供了這個功能,可以追蹤之前的消費或者回溯消費。
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Arrays; 5 import java.util.Map; 6 import java.util.Properties; 7 import java.util.Set; 8 import java.util.concurrent.atomic.AtomicBoolean; 9 10 import org.apache.kafka.clients.consumer.ConsumerConfig; 11 import org.apache.kafka.clients.consumer.ConsumerRecord; 12 import org.apache.kafka.clients.consumer.ConsumerRecords; 13 import org.apache.kafka.clients.consumer.KafkaConsumer; 14 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 15 import org.apache.kafka.clients.consumer.OffsetCommitCallback; 16 import org.apache.kafka.common.TopicPartition; 17 import org.apache.kafka.common.serialization.StringDeserializer; 18 19 public class KafkaConsumerSeekSimple { 20 21 private static AtomicBoolean running = new AtomicBoolean(true); 22 23 // 設定伺服器地址 24 private static final String bootstrapServer = "192.168.110.142:9092"; 25 26 // 設定主題 27 private static final String topic = "topic-demo3"; 28 29 // 設定消費者組 30 private static final String groupId = "group.demo"; 31 32 public static void main(String[] args) { 33 Properties properties = new Properties(); 34 // 設定反序列化key引數資訊 35 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 36 // 設定反序列化value引數資訊 37 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 38 39 // 設定伺服器列表資訊,必填引數,該引數和生產者相同,,制定連結kafka叢集所需的broker地址清單,可以設定一個或者多個 40 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 41 42 // 設定消費者組資訊,消費者隸屬的消費組,預設為空,如果設定為空,則會丟擲異常,這個引數要設定成具有一定業務含義的名稱 43 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 44 45 // 制定kafka消費者對應的客戶端id,預設為空,如果不設定kafka消費者會自動生成一個非空字串。 46 properties.put("client.id", "consumer.client.id.demo"); 47 48 // 設定每次從最早的offset開始消費 49 // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 50 51 // 將引數設定到消費者引數中 52 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 53 // 訂閱主題 54 consumer.subscribe(Arrays.asList(topic)); 55 56 // 獲取消費者所分配到的分割槽 57 Set<TopicPartition> assignment = consumer.assignment(); 58 System.err.println("列印消費者獲取到的分割槽: " + assignment.toString()); 59 60 // timeout引數設定多少合適?太短會使分割槽分配失敗,太長有可能造成一些不必要的等待 61 // 獲取到指定主題的訊息 62 consumer.poll(Duration.ofMillis(2000)); 63 64 // for (TopicPartition topicPartition : assignment) { 65 // // 引數partition表示分割槽,offset表示指定從分割槽的那個位置開始消費 66 // // 方式一,可以指定位置進行消費 67 // consumer.seek(topicPartition, 3); 68 // } 69 70 // 指定從分割槽末尾開始消費,方式二,可以從末端開始倒敘消費 71 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment); 72 for (TopicPartition topicPartition : assignment) { 73 System.err.println("列印消費者獲取到offset : " + ( endOffsets.get(topicPartition) + 1 )); 74 consumer.seek(topicPartition, endOffsets.get(topicPartition) + 1); 75 } 76 77 try { 78 while (running.get()) { 79 // 每隔一秒監聽一次,拉去指定主題分割槽的訊息 80 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 81 if (records.isEmpty()) { 82 break; 83 } 84 for (ConsumerRecord<String, String> record : records) { 85 System.out.println("我要開始消費了: " + record.toString()); 86 } 87 88 // 非同步回撥,適合訊息量非常大,但是允許訊息重複的 89 consumer.commitAsync(new OffsetCommitCallback() { 90 91 @Override 92 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { 93 if (exception == null) { 94 System.out.println("非同步回撥成功了,offset : " + offsets); 95 } else { 96 System.err.println("fail to commit offsets " + offsets + " , " + exception); 97 } 98 99 } 100 }); 101 102 } 103 } finally { 104 // 關閉客戶端 105 consumer.close(); 106 } 107 108 } 109 }
3、Kafka再均衡監聽器,再均衡是指分割槽的所屬從一個消費者轉移到另外一個消費者的行為,它為消費組具備了高可用性和伸縮性提供了保障,使得我們既方便又安全的刪除消費組內的消費者或者往消費組內新增消費者。不過再均衡期間,消費者是無法拉取訊息的。
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Collection; 5 import java.util.Collections; 6 import java.util.HashMap; 7 import java.util.Map; 8 import java.util.Properties; 9 10 import org.apache.kafka.clients.consumer.ConsumerConfig; 11 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; 12 import org.apache.kafka.clients.consumer.ConsumerRecord; 13 import org.apache.kafka.clients.consumer.ConsumerRecords; 14 import org.apache.kafka.clients.consumer.KafkaConsumer; 15 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 16 import org.apache.kafka.clients.consumer.OffsetCommitCallback; 17 import org.apache.kafka.common.TopicPartition; 18 import org.apache.kafka.common.serialization.StringDeserializer; 19 20 public class KafkaConsumerListenerSimple { 21 22 // 設定伺服器地址 23 private static final String bootstrapServer = "192.168.110.142:9092"; 24 25 // 設定主題 26 private static final String topic = "topic-demo"; 27 28 // 設定消費者組 29 private static final String groupId = "group.demo"; 30 31 public static void main(String[] args) { 32 Properties properties = new Properties(); 33 // 設定反序列化key引數資訊 34 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 35 // 設定反序列化value引數資訊 36 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 37 38 // 設定伺服器列表資訊,必填引數,該引數和生產者相同,,制定連結kafka叢集所需的broker地址清單,可以設定一個或者多個 39 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 40 41 // 設定消費者組資訊,消費者隸屬的消費組,預設為空,如果設定為空,則會丟擲異常,這個引數要設定成具有一定業務含義的名稱 42 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 43 44 // 制定kafka消費者對應的客戶端id,預設為空,如果不設定kafka消費者會自動生成一個非空字串。 45 properties.put("client.id", "consumer.client.id.demo"); 46 47 // 設定每次從最早的offset開始消費 48 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 49 50 // 手動提交開啟 51 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 52 53 // 將引數設定到消費者引數中 54 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 55 56 // 訊息訂閱 57 // consumer.subscribe(Collections.singletonList(topic)); 58 59 // 如果發生訊息重複消費或者訊息丟失的情況,當一個分割槽的消費者發生變更的時候,kafka會出現再均衡 60 // kafka提供了再均衡監聽器,可以處理自己的行為,發生再均衡期間,消費者無法拉取訊息的。 61 Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(); 62 consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { 63 64 // 65 @Override 66 public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 67 // 儘量避免重複消費 68 consumer.commitSync(currentOffsets);// 同步位移的提交 69 } 70 71 // 72 @Override 73 public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 74 75 } 76 77 }); 78 79 while (true) { 80 // 每隔一秒監聽一次,拉去指定主題分割槽的訊息 81 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 82 if (records.isEmpty()) { 83 break; 84 } 85 for (ConsumerRecord<String, String> record : records) { 86 System.out.println(record.toString()); 87 88 // 非同步提交訊息位移,在發生再均衡動作之前通過再均衡監聽器的onPartitionsRevoked回撥執行commitSync方法同步提交位移 89 currentOffsets.put(new TopicPartition(record.topic(), record.partition()), 90 new OffsetAndMetadata(record.offset() + 1)); 91 } 92 // 消費者的消費非同步提交很有可能出現訊息丟失的情況,所以在拉取完訊息之後可以將訊息的offset位移進行記錄 93 consumer.commitAsync(currentOffsets, new OffsetCommitCallback() { 94 95 @Override 96 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { 97 if (exception == null) { 98 System.out.println("非同步回撥成功了,offset : " + offsets); 99 } else { 100 System.err.println("fail to commit offsets " + offsets + " , " + exception); 101 } 102 } 103 }); 104 } 105 106 // 關閉客戶端 107 consumer.close(); 108 109 } 110 111 }
4、Kafka消費者攔截器,消費者攔截器主要是在訊息到訊息或者在提交訊息位移的時候進行一些定製化的操作。使用場景,對消費訊息設定一個有效期的屬性,如果某條訊息在既定的時間視窗內無法到達,那就視為無效,不需要再被處理。
1 package com.demo.kafka.interceptor; 2 3 import java.util.ArrayList; 4 import java.util.HashMap; 5 import java.util.List; 6 import java.util.Map; 7 8 import org.apache.kafka.clients.consumer.ConsumerInterceptor; 9 import org.apache.kafka.clients.consumer.ConsumerRecord; 10 import org.apache.kafka.clients.consumer.ConsumerRecords; 11 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 12 import org.apache.kafka.common.TopicPartition; 13 14 /** 15 * 16 * @author 消費者攔截器 17 * 18 */ 19 public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> { 20 21 // 十秒鐘 22 private static final long EXPIRE_INTERVAL = 10 * 1000; // 10000 23 24 @Override 25 public void configure(Map<String, ?> configs) { 26 27 } 28 29 @Override 30 public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { 31 // 列印輸出訊息 32 for (ConsumerRecord<String, String> record : records) { 33 System.out.println("==============================" + record.toString() + "=============================="); 34 } 35 36 // 獲取到當前時間 37 long now = System.currentTimeMillis(); 38 // 建立一個map集合物件 39 Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>(); 40 // 迴圈遍歷出消費者的訊息分割槽 41 for (TopicPartition tp : records.partitions()) { 42 System.out.println( 43 "==============獲取到的分割槽================" + tp.partition() + "=============================="); 44 // 獲取到分割槽裡面的訊息 45 List<ConsumerRecord<String, String>> tpRecords = records.records(tp); 46 // 建立一個集合物件newTpRecords 47 List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>(); 48 // 迴圈遍歷訊息 49 for (ConsumerRecord<String, String> record : tpRecords) { 50 // 如果訊息的時間戳大於當前時間超過10秒,就放到集合中 51 if (now - record.timestamp() > EXPIRE_INTERVAL) { 52 // 放到集合中 53 newTpRecords.add(record); 54 } 55 } 56 // 判斷是否為空 57 if (!newTpRecords.isEmpty()) { 58 // 將分割槽和新的訊息放到map集合中 59 newRecords.put(tp, newTpRecords); 60 } 61 } 62 63 for (Map.Entry<TopicPartition, List<ConsumerRecord<String, String>>> map : newRecords.entrySet()) { 64 for (int i = 0; i < map.getValue().size(); i++) { 65 List<ConsumerRecord<String, String>> value = map.getValue(); 66 ConsumerRecord<String, String> consumerRecord = value.get(i); 67 System.out.println("==============================" + consumerRecord.toString() 68 + "=============================="); 69 } 70 } 71 72 return new ConsumerRecords<String, String>(newRecords); 73 } 74 75 @Override 76 public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { 77 offsets.forEach((tp, offset) -> System.out.println("獲取到的offset位移: " + tp + " : " + offset.offset())); 78 } 79 80 @Override 81 public void close() { 82 83 } 84 85 public static void main(String[] args) { 86 Map<String, String> map = new HashMap<>(); 87 map.put("zhangsan", "hello world zhangsan!!!"); 88 map.put("lisi", "hello world lisi!!!"); 89 map.put("wangwu", "hello world wangwu!!!"); 90 map.put("zhaoliu", "hello world zhaoliu!!!"); 91 92 map.forEach((key, value) -> System.out.println("key : " + key + " , value : " + value)); 93 } 94 95 }
消費者配置監聽,如下所示:
1 package com.demo.kafka.consumer; 2 3 import java.time.Duration; 4 import java.util.Collections; 5 import java.util.Properties; 6 7 import org.apache.kafka.clients.consumer.ConsumerConfig; 8 import org.apache.kafka.clients.consumer.ConsumerRecord; 9 import org.apache.kafka.clients.consumer.ConsumerRecords; 10 import org.apache.kafka.clients.consumer.KafkaConsumer; 11 import org.apache.kafka.common.serialization.StringDeserializer; 12 13 import com.demo.kafka.interceptor.ConsumerInterceptorTTL; 14 15 public class KafkaConsumerInterceptorSimple { 16 17 // 設定伺服器地址 18 private static final String bootstrapServer = "192.168.110.142:9092"; 19 20 // 設定主題 21 private static final String topic = "topic-demo3"; 22 23 // 設定消費者組 24 private static final String groupId = "group.demo"; 25 26 public static void main(String[] args) { 27 Properties properties = new Properties(); 28 // 設定反序列化key引數資訊 29 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 30 // 設定反序列化value引數資訊 31 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 32 33 // 設定伺服器列表資訊,必填引數,該引數和生產者相同,,制定連結kafka叢集所需的broker地址清單,可以設定一個或者多個 34 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 35 36 // 設定消費者組資訊,消費者隸屬的消費組,預設為空,如果設定為空,則會丟擲異常,這個引數要設定成具有一定業務含義的名稱 37 properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 38 39 // 制定kafka消費者對應的客戶端id,預設為空,如果不設定kafka消費者會自動生成一個非空字串。 40 properties.put("client.id", "consumer.client.id.demo"); 41 42 // 設定每次從最早的offset開始消費 43 // properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 44 45 // 手動提交開啟 46 // properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 47 48 // 指定消費者攔截器 49 properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class.getName()); 50 51 // 將引數設定到消費者引數中 52 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 53 54 // 訊息訂閱 55 consumer.subscribe(Collections.singletonList(topic)); 56 57 while (true) { 58 // 每隔一秒監聽一次,拉去指定主題分割槽的訊息 59 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); 60 if (records.isEmpty()) { 61 break; 62 } 63 for (ConsumerRecord<String, String> record : records) { 64 System.out.println(record.toString()); 65 } 66 } 67 68 } 69 70 }