Kafka基於時間戳的消費
阿新 • • 發佈:2018-12-04
kafka 在 0.10.1.1 版本增加了時間索引檔案,因此我們可以根據時間戳來訪問訊息。
具體原理
具體使用
如以下需求:從半個小時之前的offset處開始消費訊息,程式碼示例如下:
package com.bonc.rdpe.kafka110.consumer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java. util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
public class TimestampConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092" );
props.put("group.id", "dev3-yangyunhe-topic001-group001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "dev3-yangyunhe-topic001";
try {
// 獲取topic的partition資訊
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
long nowTime = now.getTime();
System.out.println("當前時間: " + df.format(now));
long fetchDataTime = nowTime - 1000 * 60 * 30; // 計算30分鐘之前的時間戳
for(PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}
consumer.assign(topicPartitions);
// 獲取每個partition一個小時之前的偏移量
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
OffsetAndTimestamp offsetTimestamp = null;
System.out.println("開始設定各分割槽初始偏移量...");
for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果設定的查詢偏移量的時間點大於最大的索引記錄時間,那麼value就為空
offsetTimestamp = entry.getValue();
if(offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
System.out.println("partition = " + partition +
", time = " + df.format(new Date(timestamp))+
", offset = " + offset);
// 設定讀取訊息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
System.out.println("設定各分割槽初始偏移量結束...");
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
執行結果:
當前時間: 2018-07-16 10:15:09
開始設定各分割槽初始偏移量...
partition = 2, time = 2018-07-16 09:45:10, offset = 727
partition = 0, time = 2018-07-16 09:45:09, offset = 727
partition = 1, time = 2018-07-16 09:45:10, offset = 727
設定各分割槽初始偏移量結束...
partition = 1, offset = 727
partition = 1, offset = 728
partition = 1, offset = 729
......
partition = 2, offset = 727
partition = 2, offset = 728
partition = 2, offset = 729
......
partition = 0, offset = 727
partition = 0, offset = 728
partition = 0, offset = 729
......
說明:基於時間戳查詢訊息,consumer 訂閱 topic 的方式必須是 Assign