1. 程式人生 > >Kafka基於時間戳的消費

Kafka基於時間戳的消費

kafka 在 0.10.1.1 版本增加了時間索引檔案,因此我們可以根據時間戳來訪問訊息。

具體原理

在這裡插入圖片描述

具體使用

如以下需求:從半個小時之前的offset處開始消費訊息,程式碼示例如下:

package com.bonc.rdpe.kafka110.consumer;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.
util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import
org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; public class TimestampConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092"
); props.put("group.id", "dev3-yangyunhe-topic001-group001"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "dev3-yangyunhe-topic001"; try { // 獲取topic的partition資訊 List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = new ArrayList<>(); Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date now = new Date(); long nowTime = now.getTime(); System.out.println("當前時間: " + df.format(now)); long fetchDataTime = nowTime - 1000 * 60 * 30; // 計算30分鐘之前的時間戳 for(PartitionInfo partitionInfo : partitionInfos) { topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime); } consumer.assign(topicPartitions); // 獲取每個partition一個小時之前的偏移量 Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch); OffsetAndTimestamp offsetTimestamp = null; System.out.println("開始設定各分割槽初始偏移量..."); for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) { // 如果設定的查詢偏移量的時間點大於最大的索引記錄時間,那麼value就為空 offsetTimestamp = entry.getValue(); if(offsetTimestamp != null) { int partition = entry.getKey().partition(); long timestamp = offsetTimestamp.timestamp(); long offset = offsetTimestamp.offset(); System.out.println("partition = " + partition + ", time = " + df.format(new Date(timestamp))+ ", offset = " + offset); // 設定讀取訊息的偏移量 consumer.seek(entry.getKey(), offset); } } System.out.println("設定各分割槽初始偏移量結束..."); while(true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("partition = " + record.partition() + ", offset = " + record.offset()); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } } 執行結果: 當前時間: 2018-07-16 10:15:09 開始設定各分割槽初始偏移量... partition = 2, time = 2018-07-16 09:45:10, offset = 727 partition = 0, time = 2018-07-16 09:45:09, offset = 727 partition = 1, time = 2018-07-16 09:45:10, offset = 727 設定各分割槽初始偏移量結束... partition = 1, offset = 727 partition = 1, offset = 728 partition = 1, offset = 729 ...... partition = 2, offset = 727 partition = 2, offset = 728 partition = 2, offset = 729 ...... partition = 0, offset = 727 partition = 0, offset = 728 partition = 0, offset = 729 ......

說明:基於時間戳查詢訊息,consumer 訂閱 topic 的方式必須是 Assign