Kafka 0.10.1.1 以時間戳查詢訊息和暫停某些分割槽消費和消費速度控制
阿新 • • 發佈:2018-11-19
轉自:https://www.jianshu.com/p/a4c1d281b66a
1. 以時間戳查詢訊息
(1) Kafka 新版消費者基於時間戳索引消費訊息
kafka 在 0.10.1.1 版本增加了時間索引檔案,因此我們可以根據時間戳來訪問訊息。
如以下需求:從半個小時之前的offset處開始消費訊息,程式碼示例如下:
package com.bonc.rdpe.kafka110.consumer; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; public class TimestampConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092"); props.put("group.id", "dev3-yangyunhe-topic001-group001"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "dev3-yangyunhe-topic001"; try { // 獲取topic的partition資訊 List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = new ArrayList<>(); Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date now = new Date(); long nowTime = now.getTime(); System.out.println("當前時間: " + df.format(now)); long fetchDataTime = nowTime - 1000 * 60 * 30; // 計算30分鐘之前的時間戳 for(PartitionInfo partitionInfo : partitionInfos) { topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime); } consumer.assign(topicPartitions); // 獲取每個partition一個小時之前的偏移量 Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch); OffsetAndTimestamp offsetTimestamp = null; System.out.println("開始設定各分割槽初始偏移量..."); for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) { // 如果設定的查詢偏移量的時間點大於最大的索引記錄時間,那麼value就為空 offsetTimestamp = entry.getValue(); if(offsetTimestamp != null) { int partition = entry.getKey().partition(); long timestamp = offsetTimestamp.timestamp(); long offset = offsetTimestamp.offset(); System.out.println("partition = " + partition + ", time = " + df.format(new Date(timestamp))+ ", offset = " + offset); // 設定讀取訊息的偏移量 consumer.seek(entry.getKey(), offset); } } System.out.println("設定各分割槽初始偏移量結束..."); while(true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("partition = " + record.partition() + ", offset = " + record.offset()); } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } } 執行結果: 當前時間: 2018-07-16 10:15:09 開始設定各分割槽初始偏移量... partition = 2, time = 2018-07-16 09:45:10, offset = 727 partition = 0, time = 2018-07-16 09:45:09, offset = 727 partition = 1, time = 2018-07-16 09:45:10, offset = 727 設定各分割槽初始偏移量結束... partition = 1, offset = 727 partition = 1, offset = 728 partition = 1, offset = 729 ...... partition = 2, offset = 727 partition = 2, offset = 728 partition = 2, offset = 729 ...... partition = 0, offset = 727 partition = 0, offset = 728 partition = 0, offset = 729 ......
- 說明:基於時間戳查詢訊息,consumer 訂閱 topic 的方式必須是 Assign
(2) Spark基於kafka時間戳索引讀取資料並載入到RDD中
以下為一個通用的,spark讀取kafka中某段時間之前到執行程式此刻的時間範圍內的資料並載入到RDD中的方法:
package com.bonc.utils import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.{KafkaUtils, OffsetRange} import scala.collection.JavaConversions._ /** * Author: YangYunhe * Description: * Create: 2018-06-29 11:35 */ object SparkKafkaUtils { /** * 從 Kafka 中取資料載入到 RDD 中 * @param sc SparkContext * @param topic Kafka 的 Topic * @param numDays 取距離此刻多少天之前的資料,例如,這個引數為 3,那麼取此刻和3天之前相同時刻範圍內的資料 * @param kafkaParams Kafka的配置引數,用於建立生產者和作為引數傳給 KafkaUtils.createRDD * @return */ def createKafkaRDDByTimeRange(sc: SparkContext, topic: String, numDays: Int, kafkaParams: java.util.HashMap[String, Object]): RDD[String] = { val startFetchTime = DateUtils.daysAgo(numDays) val startFetchTimeStr = DateUtils.parseLong2String(startFetchTime, DateUtils.DATE_TIME_FORMAT_STR) println(s"starting fetch data in kafka with time range [${startFetchTimeStr}——${DateUtils.nowStr()}]") val consumer = new KafkaConsumer[String, String](kafkaParams) val partitionInfos = consumer.partitionsFor(topic) val topicPartitions = scala.collection.mutable.ArrayBuffer[TopicPartition]() val timestampsToSearch = scala.collection.mutable.Map[TopicPartition, java.lang.Long]() val offsetRanges = scala.collection.mutable.ArrayBuffer[OffsetRange]() for(partitionInfo <- partitionInfos) { topicPartitions += new TopicPartition(partitionInfo.topic, partitionInfo.partition) } val topicPartitionLongMap = consumer.endOffsets(topicPartitions) for(topicPartition <- topicPartitions) { timestampsToSearch(topicPartition) = startFetchTime } val topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(timestampsToSearch) for((k, v) <- topicPartitionOffsetAndTimestampMap) { offsetRanges += OffsetRange.create(topic, k.partition(), v.offset(), topicPartitionLongMap.get(k)) } KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges.toArray, PreferConsistent).map(_.value) } } 使用方法: def main(args: Array[String]): Unit = { val kafkaParams = new JHashMap[String, Object]() kafkaParams.put("bootstrap.servers", bootstrapServers) kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // 這裡就取到了kafka中3天的資料到RDD中 val rdd = SparkKafkaUtils.createKafkaRDDByTimeRange(sc, "topic", 3, kafkaParams) rdd.map(x => { // 其他操作 ...... }) }
2. 消費速度控制
在有些場景可以需要暫停某些分割槽消費,達到一定條件再恢復對這些分割槽的消費,可以使用pause()方法暫停消費,resume()方法恢復消費,示例程式碼如下:
package com.bonc.rdpe.kafka110.consumer; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; /** * @author YangYunhe * @date 2018-07-16 15:13:11 * @description: 消費速度控制 */ public class PauseAndResumeConsumer { private static final DateFormat df = new SimpleDateFormat("HH"); public static String getTimeRange() { long now = System.currentTimeMillis(); String hourStr = df.format(now); int hour; if(hourStr.charAt(0) == '0') { hour = Integer.parseInt(hourStr.substring(1, 1)); }else { hour = Integer.parseInt(hourStr); } if(hour >= 0 && hour < 8) { return "00:00-08:00"; }else if(hour >= 8 && hour < 16) { return "08:00-16:00"; }else { return "16:00-00:00"; } } public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092"); props.put("group.id", "dev3-yangyunhe-topic001-group003"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); TopicPartition partition0 = new TopicPartition("dev3-yangyunhe-topic001", 0); TopicPartition partition1 = new TopicPartition("dev3-yangyunhe-topic001", 1); TopicPartition partition2 = new TopicPartition("dev3-yangyunhe-topic001", 2); consumer.assign(Arrays.asList(new TopicPartition[]{partition0, partition1, partition2})); try { while (true) { // 00:00-08:00從partition0讀取資料 if(getTimeRange() == "00:00-08:00") { consumer.pause(Arrays.asList(new TopicPartition[]{partition1, partition2})); consumer.resume(Collections.singletonList(partition0)); // 08:00-16:00從partition1讀取資料 }else if(getTimeRange() == "08:00-16:00") { consumer.pause(Arrays.asList(new TopicPartition[]{partition0, partition2})); consumer.resume(Collections.singletonList(partition1)); // 16:00-00:00從partition2讀取資料 }else { consumer.pause(Arrays.asList(new TopicPartition[]{partition0, partition1})); consumer.resume(Collections.singletonList(partition2)); } ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = " + record.topic() + ", partition = " + record.partition()); System.out.println("offset = " + record.offset()); } } } finally { consumer.close(); } } } 結果:(我執行程式的時間是18:27,所以只會消費partition2中的訊息) topic = dev3-yangyunhe-topic001, partition = 2 offset = 0 topic = dev3-yangyunhe-topic001, partition = 2 offset = 1 topic = dev3-yangyunhe-topic001, partition = 2 offset = 2 ......
- 說明:如果需要暫停或者恢復某分割槽的消費,consumer 訂閱 topic 的方式必須是 Assign