RocketMq獲取消費資訊
阿新 • • 發佈:2018-11-03
這幾天專案需要檢視mq的消費情況,阿里開源的mq-console介面挺好的,但是裡面許可權太大了,所以就琢磨著自己開發介面獲取mq消費資訊。話不多說,貼出程式碼
long timeout = 1000 * 3; String topic = "market_test_topic"; String group = "group_name"; String nameAddr="mq的nameAddr"; //MQClientManager建立MQClientInstance,需要傳入一個ClientConfig。 //因為DefaultMQProducer繼承了ClientConfig,所以這裡直接建立了DefaultMQProducer DefaultMQProducer producer = new DefaultMQProducer(group); producer.setNamesrvAddr(nameAddr); MQClientInstance mqClient = MQClientManager.getInstance().getAndCreateMQClientInstance(producer); //這個start(),花費了我好長時間才意識到需要呼叫 mqClient.start(); //扣了rocketmq-tools的原始碼找到這個類 MQClientAPIImpl api = mqClient.getMQClientAPIImpl(); TopicRouteData routeData = api.getTopicRouteInfoFromNameServer(topic, timeout); List<BrokerData> brokerDatas = routeData.getBrokerDatas(); List<MqBrokerConsumeInfo> consumeInfoList=new ArrayList<>(); long totalBrokerOff=0l,totalConsumerOff=0l,totalDiffOff=0l; for (BrokerData brokerData : brokerDatas) { MqBrokerConsumeInfo consumeInfo=new MqBrokerConsumeInfo(); String addr = brokerData.selectBrokerAddr(); ConsumeStats consumeStats = api.getConsumeStats(addr, group, timeout); totalDiffOff+=consumeStats.computeTotalDiff(); long brokerOffset=0l,consumerOffset=0l; Collection<OffsetWrapper> values = consumeStats.getOffsetTable().values(); for(OffsetWrapper wrapper:values){ brokerOffset+=wrapper.getBrokerOffset(); totalBrokerOff+=wrapper.getBrokerOffset(); consumerOffset+=wrapper.getConsumerOffset(); totalConsumerOff+=wrapper.getConsumerOffset(); } consumeInfo.setBroker(addr); consumeInfo.setDiffTotal(consumeStats.computeTotalDiff()); consumeInfo.setBrokerOffset(brokerOffset); consumeInfo.setConsumerOffset(consumerOffset); consumeInfoList.add(consumeInfo); } MqConsumeInfo consumeInfo=new MqConsumeInfo(); consumeInfo.setTopic(topic); consumeInfo.setGroup(group); consumeInfo.setTotalBrokerOffset(totalBrokerOff); consumeInfo.setTotalConsumerOffset(totalConsumerOff); consumeInfo.setTotalDiffTotal(totalDiffOff); consumeInfo.setBrokerConsumeInfoList(consumeInfoList); System.out.println(JSON.toJSONString(consumeInfo));