OpenTSDB原始碼分析之TSDB表操作(查詢)
阿新 • • 發佈:2019-02-13
TSDB表的行鍵比tsdb-uid表複雜,查詢也要複雜一些,這種複雜性體現在欄位過濾器,在該過濾器中監控指標、日期範圍和標籤都要考慮到。
所有的過濾器都通過TsdbQuery.run()方法實現,建立帶過濾器的掃描器,遍歷返回的行和收集資料供顯示使用,使用輔助方法TsdbQuery.getScanner()和TsdbQuery.findSpans()。
Tsdb表的查詢可以具體檢視CliQuery.main()方法。
public static void main(String[] args) throws Exception { /** * 將查詢結果列印輸出 */ // args = new String[] { "2013/09/08-12:57", "2013/10/20-16:57", "sum", // "t22", "host=foo"}; /** * 查詢輸出寫入到gnuplot載入檔案中 w l lt 3 lw 2(with line linetype 3 linewidth 2)為plot繪圖引數不可缺少 */ args = new String[] { "--graph=e:/a", "2013/09/08-12:57", "2013/10/20-16:57", /*"+format x=\"%a %d %H:%M\"",*/ "sum", "t22", "host=foo", "w l lt 3 lw 2" }; ArgP argp = new ArgP(); CliOptions.addCommon(argp); CliOptions.addVerbose(argp); argp.addOption("--graph", "BASEPATH", "Output data points to a set of files for gnuplot." + " The path of the output files will start with" + " BASEPATH."); args = CliOptions.parse(argp, args); if (args == null) { usage(argp, "Invalid usage.", 1); } else if (args.length < 3) { usage(argp, "Not enough arguments.", 2); } // get a config object Config config = CliOptions.getConfig(argp); final TSDB tsdb = new TSDB(config); tsdb.checkNecessaryTablesExist().joinUninterruptibly(); final String basepath = argp.get("--graph"); argp = null; Plot plot = null; try { plot = doQuery(tsdb, args, basepath != null); } finally { try { tsdb.shutdown().joinUninterruptibly(); } catch (Exception e) { LOG.error("Unexpected exception", e); System.exit(1); } } if (plot != null) { try { /** * 將Plot寫gnuplot載入檔案 修改圖片樣式可以在Plot中實現 */ final int npoints = plot.dumpToFiles(basepath); LOG.info("Wrote " + npoints + " for Gnuplot"); } catch (IOException e) { LOG.error("Failed to write the Gnuplot file under " + basepath, e); System.exit(1); } } } private static Plot doQuery(final TSDB tsdb, final String args[], final boolean want_plot) { final ArrayList<String> plotparams = new ArrayList<String>(); final ArrayList<Query> queries = new ArrayList<Query>(); final ArrayList<String> plotoptions = new ArrayList<String>(); /** * 給queries賦值(TsdbQuery) * 給plotparams、plotoptions初始化 * * 生成gnuplot載入檔案內容示例如下: set term png small size 1024,768 set xdata time set timefmt "%s" if (GPVAL_VERSION < 4.6) set xtics rotate; else set xtics rotate right set output "e:/a.png" set xrange ["1378645020":"1382288220"] set grid set style data linespoints set key right box set format x "%a %d %H:%M" plot "e:/a_0.dat" using 1:2 title "t22{host=foo}" w l lt 3 lw 2 plotparams用於傳引數行set format x "%a %d %H:%M" plotoptions用於plot引數如:w l lt 3 lw 2 */ parseCommandLineQuery(args, tsdb, queries, plotparams, plotoptions); if (queries.isEmpty()) { usage(null, "Not enough arguments, need at least one query.", 2); } final Plot plot = (want_plot ? new Plot(queries.get(0).getStartTime(), queries.get(0).getEndTime()) : null); if (want_plot) { plot.setParams(parsePlotParams(plotparams)); } final int nqueries = queries.size(); for (int i = 0; i < nqueries; i++) { // TO DO(tsuna): Optimization: run each query in // parallel.最好是並行執行每個查詢 final StringBuilder buf = want_plot ? null : new StringBuilder(); /** * 查詢都在queries.get(i).run()中完成 */ for (final DataPoints datapoints : queries.get(i).run()) { /** * 以gnuplot的載入檔案輸出 */ if (want_plot) { plot.add(datapoints, plotoptions.get(i)); /** * 列印輸出 */ } else { final String metric = datapoints.metricName(); final String tagz = datapoints.getTags().toString(); for (final DataPoint datapoint : datapoints) { buf.append(metric).append(' ').append(datapoint.timestamp()).append(' '); if (datapoint.isInteger()) { buf.append(datapoint.longValue()); } else { buf.append(String.format("%f", datapoint.doubleValue())); } buf.append(' ').append(tagz).append('\n'); System.out.print(buf); buf.delete(0, buf.length()); } } } } return plot; } static void parseCommandLineQuery(final String[] args, final TSDB tsdb, final ArrayList<Query> queries, final ArrayList<String> plotparams, final ArrayList<String> plotoptions) { /** * 第一個引數為start_ts */ long start_ts = DateTime.parseDateTimeString(args[0], null); if (start_ts >= 0) start_ts /= 1000; long end_ts = -1; /** * 把滿足條件的第二個引數設定為end_ts */ if (args.length > 3) { // see if we can detect(發現) an end time try { if (args[1].charAt(0) != '+' && (args[1].indexOf(':') >= 0 || args[1].indexOf('/') >= 0 || args[1].indexOf('-') >= 0 || Long.parseLong(args[1]) > 0)) { end_ts = DateTime.parseDateTimeString(args[1], null); } } catch (NumberFormatException nfe) { // ignore it as it means the third parameter is likely the // aggregator } } // temp fixup to seconds from ms until the rest of TSDB supports ms // Note you can't append this to the DateTime.parseDateTimeString() call // as // it clobbers -1 results if (end_ts >= 0) end_ts /= 1000; /** * 獲取plotparams */ int i = end_ts < 0 ? 1 : 2; while (i < args.length && args[i].charAt(0) == '+') { if (plotparams != null) { plotparams.add(args[i]); } i++; } while (i < args.length) { final Aggregator agg = Aggregators.get(args[i++]); final boolean rate = args[i].equals("rate"); RateOptions rate_options = new RateOptions(false, Long.MAX_VALUE, RateOptions.DEFAULT_RESET_VALUE); if (rate) { i++; long counterMax = Long.MAX_VALUE; long resetValue = RateOptions.DEFAULT_RESET_VALUE; if (args[i].startsWith("counter")) { String[] parts = Tags.splitString(args[i], ','); if (parts.length >= 2 && parts[1].length() > 0) { counterMax = Long.parseLong(parts[1]); } if (parts.length >= 3 && parts[2].length() > 0) { resetValue = Long.parseLong(parts[2]); } rate_options = new RateOptions(true, counterMax, resetValue); i++; } } final boolean downsample = args[i].equals("downsample"); if (downsample) { i++; } final int interval = downsample ? Integer.parseInt(args[i++]) : 0; final Aggregator sampler = downsample ? Aggregators.get(args[i++]) : null; final String metric = args[i++]; final HashMap<String, String> tags = new HashMap<String, String>(); while (i < args.length && args[i].indexOf(' ', 1) < 0 && args[i].indexOf('=', 1) > 0) { Tags.parse(tags, args[i++]); } if (i < args.length && args[i].indexOf(' ', 1) > 0) { plotoptions.add(args[i++]); } final Query query = tsdb.newQuery(); query.setStartTime(start_ts); if (end_ts > 0) { query.setEndTime(end_ts); } query.setTimeSeries(metric, tags, agg, rate, rate_options); if (downsample) { query.downsample(interval, sampler); } queries.add(query); } }