Java API獲取非compacted topic總消息數
阿新 • • 發佈:2018-08-24
消息 total als false serializa ria serial 我們 ram
目前Kafka並沒有提供直接的工具來幫助我們獲取某個topic的當前總消息數,需要我們自行寫程序來實現。下列代碼可以實現這一功能,特此記錄一下:
/** * 獲取某個topic的當前消息數 * Java 8+ only * * @param topic * @param brokerList * @return */ public static long totalMessageCount(String topic, String brokerList) { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { List<TopicPartition> tps = Optional.ofNullable(consumer.partitionsFor(topic)) .orElse(Collections.emptyList()) .stream() .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toList()); Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps); return tps.stream().mapToLong(tp -> endOffsets.get(tp) - beginOffsets.get(tp)).sum(); } }
Java API獲取非compacted topic總消息數