1. 程式人生 > >基於loghub的訊息消費延遲監控

基於loghub的訊息消費延遲監控

  我們可以把loghub當作一個訊息中介軟體來使用。如果能知道當前的消費進度,自然好了,否則消費情況一無所知,總是有點慌!

  loghub消費分兩種情況,一是普通消費,二是消費組消費;

  消費組消費,loghub服務端會記錄消費情況,這時可以通過呼叫服務端API進行偏移資訊查詢。

  普通消費則不同,需要自行維護偏移量,即只有自己知道偏移資訊,自己處理延遲。我們主要討論這種情況。

一、 消費loghub資料的樣例如下:

    // 普通消費
    private static void consumeDataFromShard(int shardId) throws Exception {
        String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor();
        System.out.println("cursor = " +cursor);
        try {
            while (true) {
                PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor);
                PullLogsResponse response = client.pullLogs(request);
                List<LogGroupData> logGroups = response.getLogGroups();
                if (logGroups.isEmpty()) {
                    return;
                }

                System.out.println(response.getCount());
                System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor());
                logGroups.forEach(rec1 -> {
                    // do your biz
                });
                cursor = response.getNextCursor();
                Thread.sleep(200);
            }
        }
        catch(LogException e) {
            System.out.println(e.GetRequestId() + e.GetErrorMessage());
        }
    }

  因為消費一直在進行,想要進行監控,就插入一些埋點。我們可以使用的 Map 來儲存每個 shard 的消費延遲情況。用一個 LoghubCursorDelayTransformer 描述具體資訊。

    
    /**
     * 消費偏移控制容器
     */
    public static final ConcurrentMap<Integer, LoghubCursorDelayTransformer> CONSUME_CURSOR_DELAY_TRANSFORMER = new ConcurrentHashMap<>();
    
/**
 * loghub 分割槽延遲管理器
 *
 * @author weiy
 * @date 2019/11/27
 */
public class LoghubCursorDelayTransformer {
    /**
     * 最後一次消費 loghub 資料的時間(大約)
     */
    private int lastConsumeDataTime;

    /**
     * 消費延遲 (s)
     */
    private int delay;

    /**
     * 分割槽 shard
     */
    private int shard;

    /**
     * 記錄建立時間,如果建立時間已很久,說明該消費延遲應已失效
     */
    private long recordTime = System.currentTimeMillis();

    public LoghubCursorDelayTransformer(int lastConsumeDataTime, int delay, int shard) {
        this.lastConsumeDataTime = lastConsumeDataTime;
        this.delay = delay;
        this.shard = shard;
    }

    public int getLastConsumeDataTime() {
        return lastConsumeDataTime;
    }

    public int getDelay() {
        return delay;
    }

    public int getShard() {
        return shard;
    }

    public long getRecordTime() {
        return recordTime;
    }

}

 

二、 埋點插入監控資料

  只要在每次消費完成之後,進行一次消費延遲的記錄就好了,具體記錄可以視情況而定。比如,每消費一批次之後記錄一次就是個不錯的選擇!

    private static void consumeDataFromShard(int shardId) throws Exception {
        String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor();
        System.out.println("cursor = " +cursor);
        try {
            while (true) {
                PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor);
                PullLogsResponse response = client.pullLogs(request);
                List<LogGroupData> logGroups = response.getLogGroups();
                if (logGroups.isEmpty()) {
                    // 沒有更多資料,以當前系統時間作為最後消費時間(並不關心實際生產者是否有在產生舊資料)
                    metricConsumeDelay((int)(System.currentTimeMillis() / 1000), shardId, -1);
                    return;
                }

                System.out.println(response.getCount());
                System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor());
                logGroups.forEach(rec1 -> {
                    // do your biz
                });
                // 每批次消費完成後,記錄一次消費延遲情況
                // 此處取 最後一個訊息的時間作為批次時間點
                int lastestConsumeTime = logGroups.get(logGroups.size() -1).GetFastLogGroup().getLogs(0).getTime();
                metricConsumeDelay(lastestConsumeTime, shardId, null);
                cursor = response.getNextCursor();
                Thread.sleep(200);
            }
        }
        catch(LogException e) {
            System.out.println(e.GetRequestId() + e.GetErrorMessage());
        }
    }
    /**
     * 記錄消費延遲資訊
     *
     * @param lastConsumeTime 最後消費時間(如果沒有獲取到資料,則使用系統時間代替),單位為 s秒
     * @param shard 分割槽id
     * @param calculatedDelay 已計算好的延時,為null時需要根據當前系統時間計算
     */
    public static void metricConsumeDelay(int lastConsumeTime, int shard, Integer calculatedDelay) {
        if(calculatedDelay == null) {
            calculatedDelay = (int)(System.currentTimeMillis() / 1000) - lastConsumeTime;
        }
        LoghubCursorDelayTransformer delayTransformer = new LoghubCursorDelayTransformer(
                lastConsumeTime, calculatedDelay, shard);
        CONSUME_CURSOR_DELAY_TRANSFORMER.put(shard, delayTransformer);
    }

  如上的延遲統計是不準確的,如果想準確統計,應使用 cursor 與 最後的偏移進行對比才行。如下:

    private static void consumeDataFromShard(int shardId) throws Exception {
        String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor();
        System.out.println("cursor = " +cursor);
        try {
            while (true) {
                PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor);
                PullLogsResponse response = client.pullLogs(request);
                List<LogGroupData> logGroups = response.getLogGroups();
                if (logGroups.isEmpty()) {
                    // 沒有更多資料,以當前系統時間作為最後消費時間(並不關心實際生產者是否有在產生舊資料)
                    metricConsumeDelay((int)(System.currentTimeMillis() / 1000), shardId, -1);
                    return;
                }

                System.out.println(response.getCount());
                System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor());
                logGroups.forEach(rec1 -> {
                    // do your biz
                });
                cursor = response.getNextCursor();
                // 從loghub-api 換取具體時間,計算延遲,可能會導致效能下降厲害
                int lastestConsumeTime = exchangeTimeWithCursorFromApi(cursor, shardId);
                int delay = getMaxTimeOffsetFromApi(shardId) - lastestConsumeTime;
                metricConsumeDelay(lastestConsumeTime, shardId, delay);
                Thread.sleep(200);
            }
        }
        catch(LogException e) {
            System.out.println(e.GetRequestId() + e.GetErrorMessage());
        }
    }

    /**
     * 從loghub-api中獲取對應cursor的時間
     *
     * @param cursor 指定遊標(當前)
     * @param shardId 分割槽id
     * @return 資料時間
     * @throws LogException 查詢異常時丟擲
     */
    public static int exchangeTimeWithCursorFromApi(String cursor, int shardId) throws LogException {
        GetCursorTimeResponse cursorTimeResponse = client.GetCursorTime(project, logStore, shardId, cursor);
        return cursorTimeResponse.GetCursorTime();
    }

    /**
     * 從loghub-api中獲取最大的時間偏移,以便精確計算消費延遲
     *
     * @param shardId 分割槽id
     * @return 最大時間
     * @throws LogException 查詢異常時丟擲
     */
    public static int getMaxTimeOffsetFromApi(int shardId) throws LogException {
        String cursor = client.GetCursor(project, logStore, shardId, Consts.CursorMode.END).GetCursor();
        return exchangeTimeWithCursorFromApi(cursor, shardId);
    }

 

三、 監控資料暴露

  通過prometheus進行資料暴露!

    /**
     * 暴露延遲資訊資料,啟動時呼叫即可
     */
    public static void exposeMetricData() {
        // 統計loghub消費延時
        CollectorRegistry.defaultRegistry.register(new Collector() {
            @Override
            public List<MetricFamilySamples> collect() {
                List<MetricFamilySamples> mfs = new ArrayList<>();
                final ConcurrentMap<Integer, LoghubCursorDelayTransformer> cursorHolder = CONSUME_CURSOR_DELAY_TRANSFORMER;
                // With lastest time labels
                GaugeMetricFamily consumeTimeGauge = new GaugeMetricFamily("my_shard_consume_lastest",
                        "last consume time watch help",
                        Collections.singletonList("shard"));
                // With delay labels
                GaugeMetricFamily delayGauge = new GaugeMetricFamily("my_shard_consume_delay",
                        "delay msg help",
                        Collections.singletonList("shard"));
                // todo: 注意優化消費長時間暫停情況
                for (LoghubCursorDelayTransformer delayTransformer : cursorHolder.values()) {
                    delayGauge.addMetric(
                            Collections.singletonList(delayTransformer.getShard() + ""),
                            delayTransformer.getDelay());
                    consumeTimeGauge.addMetric(Collections.singletonList("" + delayTransformer.getShard()), delayTransformer.getLastConsumeDataTime());
                }

                mfs.add(delayGauge);
                mfs.add(consumeTimeGauge);
                return mfs;
            }

        });
    }

  是不是很簡單?自定義一個 Collector 就可以了。接入資訊的其他細節可以參考之前的文章。

 

四、 消費組的監控?

  消費端實踐

    private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    private static String sProject = "ali-cn-hangzhou-sls-admin";
    private static String sLogstore = "sls_operation_log";
    private static String sConsumerGroup = "consumerGroupX";
    private static String sAccessKeyId = "";
    private static String sAccessKey = "";
    public static void groupConsume() throws LogHubClientWorkerException, InterruptedException {
        // 第二個引數是消費者名稱,同一個消費組下面的消費者名稱必須不同,可以使用相同的消費組名稱,不同的消費者名稱在多臺機器上啟動多個程序,來均衡消費一個Logstore,這個時候消費者名稱可以使用機器ip來區分。第9個引數(maxFetchLogGroupSize)是每次從服務端獲取的LogGroup數目,使用預設值即可,如有調整請注意取值範圍(0,1000]。
        LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
        ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread執行之後,Client Worker會自動執行,ClientWorker擴充套件了Runnable介面。
        thread.start();
        Thread.sleep(60 * 60 * 1000);
        //呼叫worker的Shutdown函式,退出消費例項,關聯的執行緒也會自動停止。
        worker.shutdown();
        //ClientWorker執行過程中會生成多個非同步的Task,Shutdown之後最好等待還在執行的Task安全退出,建議sleep 30s。
        Thread.sleep(30 * 1000);
    }
// 消費業務端樣例
public class SampleLogHubProcessor implements ILogHubProcessor {
    private int shardId;
    // 記錄上次持久化 checkpoint 的時間。
    private long mLastCheckTime = 0;

    public void initialize(int shardId) {
        this.shardId = shardId;
    }

    // 消費資料的主邏輯,這裡面的所有異常都需要捕獲,不能丟擲去。
    public String process(List<LogGroupData> logGroups,
                          ILogHubCheckPointTracker checkPointTracker) {
        // 這裡簡單的將獲取到的資料打印出來。
        for (LogGroupData logGroup : logGroups) {
            FastLogGroup flg = logGroup.GetFastLogGroup();
            System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                    flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
            System.out.println("Tags");
            for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                FastLogTag logtag = flg.getLogTags(tagIdx);
                System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
            }
            for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                FastLog log = flg.getLogs(lIdx);
                System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                    FastLogContent content = log.getContents(cIdx);
                    System.out.println(content.getKey() + "\t:\t" + content.getValue());
                }
            }
        }
        long curTime = System.currentTimeMillis();
        // 每隔 30 秒,寫一次 checkpoint 到服務端,如果 30 秒內,worker crash,
        // 新啟動的 worker 會從上一個 checkpoint 取消費資料,有可能有少量的重複資料。
        if (curTime - mLastCheckTime > 30 * 1000) {
            try {
                //引數true表示立即將checkpoint更新到服務端,為false會將checkpoint快取在本地,後臺預設隔60s會將checkpoint重新整理到服務端。
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            mLastCheckTime = curTime;
        }
        return null;
    }

    // 當 worker 退出的時候,會呼叫該函式,使用者可以在此處做些清理工作。
    public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
        //將消費斷點儲存到服務端。
        try {
            checkPointTracker.saveCheckPoint(true);
        } catch (LogHubCheckPointException e) {
            e.printStackTrace();
        }
    }
}

class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
    public ILogHubProcessor generatorProcessor() {
        // 生成一個消費例項。
        return new SampleLogHubProcessor();
    }
}

  實現原理即定期向loghub中寫入 checkpoint, 以便可以查詢。既然資料都寫入了 loghub 服務端,那麼也能很容易在後臺看到消費延遲了。

  不過我們也可以通過api獲取消費情況,自行另外監控也行。(只是意義不大)

  可以通過如下方式獲取當前消費情況,與最後的資料偏移做比較,就可以得到延遲情況了。

    List<ConsumerGroupShardCheckPoint> checkPoints = client.GetCheckPoint(project, sLogstore, sConsumerGroup).getCheckPoints();

 

五、 grafana 延遲監控配置

  前面通過prometheus獲取到了延遲資料,接入到grafana後,就可以進行展示了。我們先來看下最終效果!

 

  配置本身是很簡單的,有個注意的點是需要整合兩個座標資料,因為一個消費延遲資料,另一個是具體的消費時間,這樣就可以同步查看了。

  配置右邊的Y軸座標需要使用 series override 選項,使用正則進行匹配如: /最後消費時間shard:.*/i

  時間選項需要乘以1000變為毫秒如: test_shard_consume_lastest * 1000

  監控思路可以擴充套件到以拉取模式進行消費的訊息系統。

&n