kafka監控獲取logSize, offset, lag等資訊
阿新 • • 發佈:2019-02-02
由於專案需要,需要檢視kafka消費資訊lag(lag = logSize - offset)
參考https://www.aliyun.com/jiaocheng/775267.html 的實現方式在有些場景無法獲取offset的值(具體原因暫不知曉後續研究下)
因此決定直接從zookeeper中取offset值
一、springboot專案新增依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
二、相關程式碼
import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaUtil { private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class); private static final int ZOOKEEPER_TIMEOUT = 30000; private final CountDownLatch latch = new CountDownLatch(1); public ZooKeeper getZookeeper(String connectionString) { ZooKeeper zk = null; try { zk = new ZooKeeper(connectionString, ZOOKEEPER_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected.equals(event.getState())) { latch.countDown(); } } }); latch.await(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return zk; } public static Properties getConsumerProperties(String groupId, String bootstrap_servers) { Properties props = new Properties(); props.put("group.id", groupId); props.put("bootstrap.servers", bootstrap_servers); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } /** * 獲取logSize, offset, lag等資訊 * @param zk * @param bootstrap_servers * @param groupId * @param topics null查詢groupId消費過的所有topic * @param sorted * @return * @throws Exception */ public List<Map<String, Object>> getLagByGroupAndTopic(ZooKeeper zk, String bootstrap_servers, String groupId, String[] topics, boolean sorted) throws Exception { List<Map<String, Object>> topicPatitionMapList = new ArrayList<>(); // 獲取group消費過的所有topic List<String> topicList = null; if (topics == null || topics.length == 0) { try { topicList = zk.getChildren("/consumers/" + groupId + "/offsets", false); } catch (KeeperException | InterruptedException e) { logger.error("從zookeeper獲取topics失敗:zkState: {}, groupId:{}", zk.getState(), groupId); throw new Exception("從zookeeper中獲取topics失敗"); } } else { topicList = Arrays.asList(topics); } Properties consumeProps = getConsumerProperties(groupId, bootstrap_servers); logger.info("consumer properties:{}", consumeProps); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumeProps); // 查詢topic partitions for (String topic : topicList) { List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic); //由於有時延, 儘量逐個topic查詢, 減少lag為負數的情況 List<TopicPartition> topicPartitions = new ArrayList<>(); // 獲取topic對應的 TopicPartition for (PartitionInfo partitionInfo : partitionsFor) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitions.add(topicPartition); } // 查詢logSize Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); for (Entry<TopicPartition, Long> entry : endOffsets.entrySet()) { TopicPartition partitionInfo = entry.getKey(); // 獲取offset String offsetPath = MessageFormat.format("/consumers/{0}/offsets/{1}/{2}", groupId, partitionInfo.topic(), partitionInfo.partition()); byte[] data = zk.getData(offsetPath, false, null); long offset = Long.valueOf(new String(data)); Map<String, Object> topicPatitionMap = new HashMap<>(); topicPatitionMap.put("group", groupId); topicPatitionMap.put("topic", partitionInfo.topic()); topicPatitionMap.put("partition", partitionInfo.partition()); topicPatitionMap.put("logSize", endOffsets.get(partitionInfo)); topicPatitionMap.put("offset", offset); topicPatitionMap.put("lag", endOffsets.get(partitionInfo) - offset); topicPatitionMapList.add(topicPatitionMap); } } consumer.close(); if(sorted) { Collections.sort(topicPatitionMapList, new Comparator<Map<String,Object>>() { @Override public int compare(Map<String, Object> o1, Map<String, Object> o2) { if(o1.get("topic").equals(o2.get("topic"))) { return ((Integer)o1.get("partition")).compareTo((Integer)o2.get("partition")); } return ((String)o1.get("topic")).compareTo((String)o2.get("topic")); } }); } return topicPatitionMapList; } public static void main(String[] args) throws Exception { String bootstrap_servers = "localhost:9092"; String groupId = "interface-group-new"; String[] topics = null;//{"test1", "test2", test3}; KafkaUtil kafkaUtil = new KafkaUtil(); String connectionString = "localhost:2181"; ZooKeeper zk = kafkaUtil.getZookeeper(connectionString); if (zk == null) { throw new RuntimeException("獲取zookeeper連線失敗"); } List<Map<String, Object>> topicPatitionMapList = kafkaUtil.getLagByGroupAndTopic(zk, bootstrap_servers, groupId, topics, true); for (Map<String, Object> map : topicPatitionMapList) { System.out.println(map); } zk.close(); } }
三、說明
呼叫時引數topics為空會獲取到groupId所有消費過的topic(zookeeper會儲存消費過的groupId的offset值)
List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic);
獲取到List<PartitionInfo> 後要儘快查詢zookeeper對應的offset,避免由於繼續生產消費或時延導致offset > logSize