1. 程式人生 > >kafka告警簡單方案 依賴配置中心實現注有@ConfigurationProperties的bean相關屬性重新整理 簡單封裝kafka相關的api

kafka告警簡單方案 依賴配置中心實現注有@ConfigurationProperties的bean相關屬性重新整理 簡單封裝kafka相關的api

一、前言

  為什麼要設計kafka告警方案?現成的監控專案百度一下一大堆,KafkaOffsetMonitor、KafkaManager、 Burrow等,具體參考:kafka的訊息擠壓監控。由於本小組的專案使用的kafka叢集並沒有被公司的kafka-manager管理,所以只能自己簡單做一個告警。

二、告警方案

  

  首先需要兩個定時任務,之間的通訊依靠延遲佇列。

  左邊的定時任務按週期掃面配置Topic-Consumer列表,通過kafka api獲取消費詳情並判斷訊息積壓量是否已經大於閾值,如果閾值校驗失敗則放入延遲隊裡。

  右邊的定時任務按照週期從延遲佇列對應的真實佇列中取出一個Topic-Consumer關係,再次進行一下閾值的校驗,如果檢驗失敗才傳送告警簡訊。

三、準備工作

  1、依賴配置中心

  配置中心是實現告警方案的一個關鍵點,通過配置中心可以動態獲取告警相關的屬性配置,並重新整理對應的 java bean。如下是告警對應的配置bean。

@ConfigCenterBean
@ConfigurationProperties(prefix = "wmhcontrol.alarm")
@Component
public class AlarmConstants extends BaseConfigCenterBean {
    private static Logger LOGGER = LoggerFactory.getLogger(AlarmConstants.class
); //告警電話號碼 @ConfigField private String[] phones; //簡訊模板ID @ConfigField private String templateId; //延遲時間 @ConfigField private Integer delay = 600; //輪訓時間 @ConfigField private Integer period = 60; //獲取topic-consumer消費詳情地址 @ConfigField
private String tcsr; //檢視topic-consumer消費詳情地址 @ConfigField private String tclr; //全域性統一閾值 @ConfigField private Integer threshold = 1000; //topic和consumer關係列表 @ConfigField private List<TCR> tcrs; @ToInitial private void refreshProperties() { try { super.doBind(); LOGGER.info(String.format("%s 重新整理成功..., 當前配置=%s...", this.getModuleName(), this)); } catch (Exception e) { LOGGER.error("AlarmConstants 物件屬性繫結失敗...", e); } } private void toRefresh() { try { if (isCfgCenterEffect()) { ZookeeperPropertySource propertySource = ConfigHelper.getZookeeperPropertySource(); if (ConfigCenterUtils.propertySourceShouldRefresh(this.getModuleName(), propertySource)) { this.refreshProperties(); } } } catch (Exception e) { LOGGER.error("AlarmConstants 物件屬性重新整理失敗", e); } } @ToRefresh public Integer getThreshold() { return threshold; } public void setThreshold(Integer threshold) { this.threshold = threshold; } @ToRefresh public List<TCR> getTcrs() { return tcrs; } public void setTcrs(List<TCR> tcrs) { this.tcrs = tcrs; } @ToRefresh public String getTcsr() { return tcsr; } public void setTcsr(String tcsr) { this.tcsr = tcsr; } @ToRefresh public Integer getPeriod() { return period; } public void setPeriod(Integer period) { this.period = period; } @ToRefresh public Integer getDelay() { return delay; } public void setDelay(Integer delay) { this.delay = delay; } @ToRefresh public String[] getPhones() { return phones; } public void setPhones(String[] phones) { this.phones = phones; } @ToRefresh public String getTemplateId() { return templateId; } public void setTemplateId(String templateId) { this.templateId = templateId; } @ToRefresh public String getTclr() { return tclr; } public void setTclr(String tclr) { this.tclr = tclr; } @Override public String toString() { return ReflectionToStringBuilder.toString(this , ToStringStyle.JSON_STYLE , false , false , AlarmConstants.class); } @Override public String getDefaultResourcePath() { return "config/alarm.properties"; } @Override public String getConfigPrefix() { return "wmhcontrol.alarm"; } @Override public String getModuleName() { return "告警配置"; } @Override public void refreshForEvent() { this.refreshProperties(); } /** * topic 和 consumer之間的關係實體 */ public static class TCR { private String topic; private String consumer; private String channel; private Integer threshold; public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getConsumer() { return consumer; } public void setConsumer(String consumer) { this.consumer = consumer; } public String getChannel() { return channel; } public void setChannel(String channel) { this.channel = channel; } public Integer getThreshold() { return threshold; } public void setThreshold(Integer threshold) { this.threshold = threshold; } @Override public String toString() { return "TCR{" + "topic='" + topic + '\'' + ", consumer='" + consumer + '\'' + ", channel='" + channel + '\'' + ", threshold=" + threshold + '}'; } } public static class TopicConsumerDetail { private String group; private String topic; private Integer pid; private Long offset; private Long logsize; @Override public String toString() { return "TopicConsumerDetail{" + "group='" + group + '\'' + ", topic='" + topic + '\'' + ", pid=" + pid + ", offset=" + offset + ", logsize=" + logsize + ", lag=" + lag + ", owner='" + owner + '\'' + '}'; } private Long lag; private String owner; public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public Integer getPid() { return pid; } public void setPid(Integer pid) { this.pid = pid; } public Long getOffset() { return offset; } public void setOffset(Long offset) { this.offset = offset; } public Long getLogsize() { return logsize; } public void setLogsize(Long logsize) { this.logsize = logsize; } public Long getLag() { return lag; } public void setLag(Long lag) { this.lag = lag; } public String getOwner() { return owner; } public void setOwner(String owner) { this.owner = owner; } } }

  告警有個全域性統一的閾值,每一個topic可以指定不同的閾值。

  配置中心 和 java bean建立關聯請參考:依賴配置中心實現注有@ConfigurationProperties的bean相關屬性重新整理

  2、定時任務的週期性可動態配置

  藉助 org.springframework.scheduling.annotation.SchedulingConfigurer。

  由@EnableScheduling註釋的@Configuration類實現的可選介面。通常用於設定在執行計劃任務時使用的特定TaskScheduler bean,或者用於以程式設計方式註冊計劃任務,而不是使用@Scheduled註釋的宣告性方法。例如,在實現基於觸發器的任務時可能需要這樣做,而@Scheduled註釋不支援這些任務。

  基本的程式碼輪廓如下。

@Configuration
public class MessageCenterAlarmTask implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        try {
            //每5s檢測一下佇列
            taskRegistrar.addFixedRateTask(() -> {
               
            }, 5 * 1000L);

            //動態修改定時任務週期
            taskRegistrar.addTriggerTask(() -> {

            }, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));
        } catch (Exception e) {
            LOGGER.error("訊息中心topic-consumer定時任務初始化失敗...", e);
        }
    }
}

  上面的程式碼中的定時任務分別表示了告警方案中右邊和左邊的定時任務。

  3、延遲佇列的實現

  藉助redisson分散式延遲佇列 或者 java delayqueue + redistemplate 實現分散式延遲佇列。

  參考:Redisson分散式延遲佇列官方文件

  參考:Redisson DelayedQueue實現原理

  Redisson的叢集模式配置如下。

public class RedissonBuilder {
    private static Logger LOGGER = LoggerFactory.getLogger(RedissonBuilder.class);

    public static RedissonClient getRedisson(String cluster) {
        String[] nodes = cluster.split(",");
        for (int i = 0; i < nodes.length; i++) {
            nodes[i] = "redis://" + nodes[i];
        }

        Config config = new Config();
        config.useClusterServers() //這是用的叢集server
                .setScanInterval(2000) //設定叢集狀態掃描時間
                .setConnectTimeout(2000)
                .addNodeAddress(nodes);

        try {
            RedissonClient rc = Redisson.create(config);
            return rc;
        } catch (Exception e) {
            LOGGER.error("RedissonClient getRedisson errors...", e);
            return null;
        }
    }
}


@Configuration
public class RedissonConfig {
    private static Logger LOGGER = LoggerFactory.getLogger(RedissonConfig.class);

    @Bean
    public RedissonClient redissonClient(@Value("${redisAddress}") String cacheAddress) {
        RedissonClient rc = RedissonBuilder.getRedisson(cacheAddress);
        try {
            if (!Objects.isNull(rc)) {
                LOGGER.info(rc.getConfig().toJSON());
            }
        } catch (IOException e) {
            LOGGER.error("RedissonConfig redissonClient errors... ", e);
        }
        return rc;
    }

}

  Redisson建立延遲佇列方式

RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");
RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);

  首先建立目標佇列,然後通過目標佇列拿到延遲佇列。

  4、kafka api返回資料處理

  參考:簡單封裝kafka相關的api

  更具topic和consumer可以拿到如下資料。其中Lag對應的這一列表示未消費的訊息數量。

  

  需要做的是把如上資料對映到 AlarmConstants.TopicConsumerDetail 這個java bean中,藉助Spring BeanWrapperImpl,如下。

private static List<AlarmConstants.TopicConsumerDetail> retrieveDetail(String detailResponse) {
    List<AlarmConstants.TopicConsumerDetail> result = new ArrayList<>();
    try {
        Scanner scanner = new Scanner(detailResponse.replace("<pre>", StringUtils.EMPTY).replace("</pre>", StringUtils.EMPTY));
        String[] propertyNames = null;
//第一行對應java bean的field name
if (scanner.hasNextLine()) { String nameLine = scanner.nextLine(); if (StringUtils.isBlank(nameLine)) { return result; } propertyNames = Arrays.stream(nameLine.split("\\s+")) .map(propertyName -> propertyName.toLowerCase()) .toArray(String[]::new); }
//剩餘行對應java bean的field value
while (scanner.hasNextLine()) { AlarmConstants.TopicConsumerDetail tcd = new AlarmConstants.TopicConsumerDetail(); BeanWrapper br = new BeanWrapperImpl(tcd); String valueLine = scanner.nextLine(); if (StringUtils.isBlank(valueLine)) { continue; } String[] propertyValues = valueLine.split("\\s+"); for (int i = 0; i < propertyValues.length; i++) { br.setPropertyValue(propertyNames[i], propertyValues[i]); } result.add(tcd); } LOGGER.info("訊息中心提取topic-consumer詳情資訊:" + result); } catch (Exception e) { LOGGER.error("訊息中心提取topic-consumer資訊異常..." + detailResponse, e); } return result; }

  處理之後的效果如下。

[TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=0, offset=10956087, logsize=10956091, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=1, offset=10950487, logsize=10950502, lag=15, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=2, offset=10958523, logsize=10958529, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=3, offset=10955709, logsize=10955717, lag=8, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=4, offset=10956550, logsize=10956563, lag=13, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818094331-835bc5f7-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=5, offset=10956343, logsize=10956347, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=6, offset=10954124, logsize=10954128, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=7, offset=10949075, logsize=10949082, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=8, offset=10963839, logsize=10963843, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=9, offset=10958536, logsize=10958540, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0495-DMZ-A620R-1543818202772-32905832-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=10, offset=10955316, logsize=10955327, lag=11, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=11, offset=10957850, logsize=10957856, lag=6, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=12, offset=10954508, logsize=10954515, lag=7, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=13, offset=10960468, logsize=10960477, lag=9, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}, TopicConsumerDetail{group='group-message-center-consumer-TELEPHONE_HOTLINE', topic='message-center-telephone-hotline-topic', pid=14, offset=10955540, logsize=10955544, lag=4, owner='group-message-center-consumer-TELEPHONE_HOTLINE_N03-NFJD-BB-SV0496-DMZ-A620R-1543818798625-b7a38283-0'}]

四、告警邏輯

  1、短息傳送

private String toShortMessage(AlarmConstants.TCR tcr, Long lag) {
    JSONObject info = new JSONObject();
    StringBuilder text = new StringBuilder();
    String messageDate = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
    text.append("【Topic-Consumer閾值告警 " + messageDate + "】\r\n");

    text.append("\t渠道: " + tcr.getChannel() + "\r\n");
    text.append("\t主題: " + tcr.getTopic() + "\r\n");
    text.append("\t消費: " + tcr.getConsumer() + "\r\n");
    text.append("\t閾值: " + (Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold()) + "\r\n");
    text.append("\t堆積: " + lag + "\r\n");

    try {
        String refUrl = alarmConstants.getTclr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();
        JSONObject params = new JSONObject();
        params.put("url", refUrl);
        String shortUrlResponse = HttpClient.post("https://dwz.cn/admin/create", params.toJSONString(), "application/json");
        LOGGER.info("獲取短連結返回內容:" + shortUrlResponse);
        if (StringUtils.isNotBlank(shortUrlResponse)) {
            JSONObject shortUrlJson = JSON.parseObject(shortUrlResponse);
            Integer code = (Integer) FastJsonUtils.search(shortUrlJson, "Code");
            if (Integer.valueOf(0).equals(code)) {
                String shortUrl = (String) FastJsonUtils.search(shortUrlJson, "ShortUrl");
                if (StringUtils.isNotBlank(shortUrl)) {
                    text.append("\t檢視: " + shortUrl + "\r\n");
                }
            }
        }
    } catch (Exception e) {
        LOGGER.error("短連結生成異常...", e);
    }
    info.put("txt_name", "訊息中心");
    info.put("txt_result", text.toString());

    return info.toJSONString();
}

  2、閾值校驗

private Long thresholdCheck(AlarmConstants.TCR tcr) {
    String detailUrl = alarmConstants.getTcsr() + "?topic=" + tcr.getTopic() + "&group=" + tcr.getConsumer();
    String detailResponseStr = HttpClient.get(detailUrl);
    LOGGER.info(detailUrl + " " + detailResponseStr);
    List<AlarmConstants.TopicConsumerDetail> detailResponse = retrieveDetail(detailResponseStr);

    if (CollectionUtils.isEmpty(detailResponse)) {
        return -1L;
    }
    Long lag = detailResponse.stream()
            .mapToLong(AlarmConstants.TopicConsumerDetail::getLag)
            .sum();

    Long threshold = Long.valueOf(Objects.isNull(tcr.getThreshold()) ? alarmConstants.getThreshold() : tcr.getThreshold());
    if (lag.compareTo(threshold) > 0) {
        return lag;
    }
    return -1L;
}

  3、兩個定時任務邏輯補充

@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
    try {
        RQueue<AlarmConstants.TCR> topicConsumerQueue = redissonClient.getQueue(commonRedisKey + "message_center_tcrs");
        RDelayedQueue<AlarmConstants.TCR> topicConsumerDelayedQueue = redissonClient.getDelayedQueue(topicConsumerQueue);

        //每5s檢測一下佇列
        taskRegistrar.addFixedRateTask(() -> {
            AlarmConstants.TCR tcr = topicConsumerQueue.poll();
            if (!Objects.isNull(tcr)) {
                //傳送告警資訊
                Long lag = thresholdCheck(tcr);
                if (lag > 0) {
                    if (ArrayUtils.isNotEmpty(alarmConstants.getPhones())) {
                        String message = toShortMessage(tcr, lag);
                        String tmplateId = alarmConstants.getTemplateId();
                        LOGGER.info("訊息中心告警簡訊內容:" + message);
                        for (String phone : alarmConstants.getPhones()) {
                            try {
                                MessageUtils.sendMessage(phone, messageUrl, message, tmplateId);
                            } catch (Exception e) {
                                LOGGER.error(String.format("訊息中心告警簡訊傳送異常...%s %s %s", phone, messageUrl, message), e);
                            }
                        }
                    }
                }
            }
        }, 5 * 1000L);

        taskRegistrar.addTriggerTask(() -> {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(commonRedisKey + "TopicConsumerForEach");
                // 嘗試加鎖,最多等待5秒,上鎖以後5秒自動解鎖
                if (!lock.tryLock(5, 5, TimeUnit.SECONDS)) {
                    return;
                }
                if (!CollectionUtils.isEmpty(alarmConstants.getTcrs())) {
                    alarmConstants.getTcrs()
                            .stream()
                            .filter(tcr -> !topicConsumerDelayedQueue.contains(tcr) && (thresholdCheck(tcr) > 0))
                            .forEach(tcr -> topicConsumerDelayedQueue.offer(tcr, alarmConstants.getDelay(), TimeUnit.SECONDS));
                }
            } catch (Exception e) {
                LOGGER.error("訊息中心topic-consumer定時任務執行失敗...", e);
            } finally {
                if (!Objects.isNull(lock)) {
                    lock.unlock();
                }
            }
     //動態週期性檢測Topic-Consumer閾值
        }, triggerContext -> new PeriodicTrigger(alarmConstants.getPeriod(), TimeUnit.SECONDS).nextExecutionTime(triggerContext));
    } catch (Exception e) {
        LOGGER.error("訊息中心topic-consumer定時任務初始化失敗...", e);
    }
}

五、告警定時任務原始碼

  請關注訂閱號(演算法和技術SHARING),回覆:kafka告警, 便可檢視。